Skip to content

Commit

Permalink
Subscribe core method (#205)
Browse files Browse the repository at this point in the history
Expose slightly lower level API for subscriptions to make it easier
to build custom messaging patterns.
  • Loading branch information
mtmk authored Nov 13, 2023
1 parent 240a7e5 commit c2ef9e9
Show file tree
Hide file tree
Showing 19 changed files with 129 additions and 45 deletions.
28 changes: 28 additions & 0 deletions docs/documentation/core/pub-sub.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,31 @@ await nats.PublishAsync<int>("foo", -1);

await sub;
```

## Subscriptions with Lower Level Control

The `SubscribeAsync()` method is a convenient way to subscribe to a subject and receive messages without much effort.
If you need more control over how subscription is handled, you can use the `SubscribeCoreAsync()` method instead.

```csharp
await using var nats = new NatsConnection();

await using sub = await nats.SubscribeAsync<int>("foo");

for (int i = 0; i < 10; i++)
{
Console.WriteLine($" Publishing {i}...");
await nats.PublishAsync<int>("foo", i);
}

await nats.PublishAsync<int>("foo", -1);

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");
if (msg.Data == -1)
break;
}

await sub.UnsubscribeAsync();
```
25 changes: 25 additions & 0 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@ public interface INatsConnection
/// </remarks>
IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group
/// and returns a <see cref="INatsSub{T}"/> object which provides more control over the subscription.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>An asynchronous task that completes with the NATS subscription.</returns>
/// <remarks>
/// <para>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </para>
/// <para>
/// This method returns a <see cref="INatsSub{T}"/> object which provides slightly lower level
/// control over the subscription. You can use this object to create your own core messaging
/// patterns or to create your own higher level abstractions.
/// </para>
/// </remarks>
ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID}
/// </summary>
Expand Down
30 changes: 30 additions & 0 deletions src/NATS.Client.Core/INatsSub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Threading.Channels;

namespace NATS.Client.Core;

public interface INatsSub<T> : IAsyncDisposable
{
/// <summary>
/// Access incoming messages for your subscription.
/// </summary>
ChannelReader<NatsMsg<T>> Msgs { get; }

/// <summary>
/// The subject name to subscribe to.
/// </summary>
string Subject { get; }

/// <summary>
/// If specified, the subscriber will join this queue group. Subscribers with the same queue group name,
/// become a queue group, and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </summary>
string? QueueGroup { get; }

/// <summary>
/// Complete the message channel, stop timers if they were used and send an unsubscribe
/// message to the server.
/// </summary>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous server UNSUB operation.</returns>
public ValueTask UnsubscribeAsync();
}
3 changes: 2 additions & 1 deletion src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
}
}

internal async ValueTask<NatsSub<T>> SubscribeInternalAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
/// <inheritdoc />
public async ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsSubUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace NATS.Client.Core;

public sealed class NatsSub<T> : NatsSubBase
public sealed class NatsSub<T> : NatsSubBase, INatsSub<T>
{
private readonly Channel<NatsMsg<T>> _msgs;

Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.CheckNativeAot/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async Task RequestReplyTests()
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeInternalAsync<int>("foo");
var sub = await nats.SubscribeCoreAsync<int>("foo");
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void Subject_manager_should_not_hold_on_to_subscription_if_collected()
async Task Isolator()
{
// Subscription is not being disposed here
var natsSub = await nats.SubscribeInternalAsync<string>("foo");
var natsSub = await nats.SubscribeCoreAsync<string>("foo");
Assert.That(natsSub.Subject, Is.EqualTo("foo"));
}

Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.Tests/CancellationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task CommandTimeoutTest()
await using var pubConnection = server.CreateClientConnection(NatsOpts.Default with { CommandTimeout = TimeSpan.FromSeconds(1) });
await pubConnection.ConnectAsync();

await subConnection.SubscribeInternalAsync<string>("foo");
await subConnection.SubscribeCoreAsync<string>("foo");

var cmd = new SleepWriteCommand("PUB foo 5\r\naiueo", TimeSpan.FromSeconds(10));
pubConnection.PostDirectWrite(cmd);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.Tests/JsonSerializerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task Serialize_any_type()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var sub = await nats.SubscribeInternalAsync<SomeTestData>("foo", cancellationToken: cancellationToken);
await using var sub = await nats.SubscribeCoreAsync<SomeTestData>("foo", cancellationToken: cancellationToken);
await nats.PingAsync(cancellationToken);
await nats.PublishAsync("foo", new SomeTestData { Name = "bar" }, cancellationToken: cancellationToken);

Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public async Task UserCredentialAuthTest(Auth auth)
var signalComplete2 = new WaitSignal();

var syncCount = 0;
var natsSub = await subConnection.SubscribeInternalAsync<int>(subject);
var natsSub = await subConnection.SubscribeCoreAsync<int>(subject);
var register = natsSub.Register(x =>
{
Interlocked.Increment(ref syncCount);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public async Task HeaderParsingTest()
var sync = 0;
var signal1 = new WaitSignal<NatsMsg<int>>();
var signal2 = new WaitSignal<NatsMsg<int>>();
var sub = await nats.SubscribeInternalAsync<int>("foo");
var sub = await nats.SubscribeCoreAsync<int>("foo");
var reg = sub.Register(m =>
{
if (m.Data < 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public async Task QueueGroupsTest()
await using var conn2 = server.CreateClientConnection();
await using var conn3 = server.CreateClientConnection();

var sub1 = await conn1.SubscribeInternalAsync<int>("foo.*", queueGroup: "my-group");
var sub2 = await conn2.SubscribeInternalAsync<int>("foo.*", queueGroup: "my-group");
var sub1 = await conn1.SubscribeCoreAsync<int>("foo.*", queueGroup: "my-group");
var sub2 = await conn2.SubscribeCoreAsync<int>("foo.*", queueGroup: "my-group");

var signal = new WaitSignal();
var cts = new CancellationTokenSource();
Expand Down
10 changes: 5 additions & 5 deletions tests/NATS.Client.Core.Tests/NatsConnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public async Task SimplePubSubTest()
var signalComplete = new WaitSignal();

var list = new List<int>();
var sub = await subConnection.SubscribeInternalAsync<int>(subject);
var sub = await subConnection.SubscribeCoreAsync<int>(subject);
var register = sub.Register(x =>
{
_output.WriteLine($"Received: {x.Data}");
Expand Down Expand Up @@ -70,7 +70,7 @@ public async Task EncodingTest()

var actual = new List<SampleClass?>();
var signalComplete = new WaitSignal();
var sub = await subConnection.SubscribeInternalAsync<SampleClass>(key);
var sub = await subConnection.SubscribeCoreAsync<SampleClass>(key);
var register = sub.Register(x =>
{
actual.Add(x.Data);
Expand Down Expand Up @@ -109,7 +109,7 @@ public async Task RequestTest(int minSize)
var text = new StringBuilder(minSize).Insert(0, "a", minSize).ToString();

var sync = 0;
var sub = await subConnection.SubscribeInternalAsync<int>(subject);
var sub = await subConnection.SubscribeCoreAsync<int>(subject);
var reg = sub.Register(async m =>
{
if (m.Data < 10)
Expand Down Expand Up @@ -168,7 +168,7 @@ public async Task ReconnectSingleTest()
var sync = 0;
var waitForReceive300 = new WaitSignal();
var waitForReceiveFinish = new WaitSignal();
var sub = await subConnection.SubscribeInternalAsync<int>(subject);
var sub = await subConnection.SubscribeCoreAsync<int>(subject);
var reg = sub.Register(x =>
{
if (x.Data < 10)
Expand Down Expand Up @@ -263,7 +263,7 @@ public async Task ReconnectClusterTest()
var sync = 0;
var waitForReceive300 = new WaitSignal();
var waitForReceiveFinish = new WaitSignal();
var sub = await connection1.SubscribeInternalAsync<int>(subject);
var sub = await connection1.SubscribeCoreAsync<int>(subject);
var reg = sub.Register(x =>
{
if (x.Data < 10)
Expand Down
14 changes: 7 additions & 7 deletions tests/NATS.Client.Core.Tests/ProtocolTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public async Task Subscription_with_same_subject()
var nats1 = server.CreateClientConnection();
var (nats2, proxy) = server.CreateProxiedClientConnection();

var sub1 = await nats2.SubscribeInternalAsync<int>("foo.bar");
var sub2 = await nats2.SubscribeInternalAsync<int>("foo.bar");
var sub3 = await nats2.SubscribeInternalAsync<int>("foo.baz");
var sub1 = await nats2.SubscribeCoreAsync<int>("foo.bar");
var sub2 = await nats2.SubscribeCoreAsync<int>("foo.bar");
var sub3 = await nats2.SubscribeCoreAsync<int>("foo.baz");

var sync1 = 0;
var sync2 = 0;
Expand Down Expand Up @@ -114,7 +114,7 @@ void Log(string text)
var sync = 0;
var signal1 = new WaitSignal<NatsMsg<int>>();
var signal2 = new WaitSignal<NatsMsg<int>>();
var sub = await nats.SubscribeInternalAsync<int>("foo.*");
var sub = await nats.SubscribeCoreAsync<int>("foo.*");
var reg = sub.Register(m =>
{
switch (m.Subject)
Expand Down Expand Up @@ -182,7 +182,7 @@ void Log(string text)
Log("### Auto-unsubscribe after consuming max-msgs");
{
var opts = new NatsSubOpts { MaxMsgs = maxMsgs };
await using var sub = await nats.SubscribeInternalAsync<int>("foo", opts: opts);
await using var sub = await nats.SubscribeCoreAsync<int>("foo", opts: opts);
sid++;

await Retry.Until("all frames arrived", () => proxy.Frames.Count >= 2);
Expand Down Expand Up @@ -212,7 +212,7 @@ void Log(string text)
{
await proxy.FlushFramesAsync(nats);

await using var sub = await nats.SubscribeInternalAsync<int>("foo2");
await using var sub = await nats.SubscribeCoreAsync<int>("foo2");
sid++;
await sub.UnsubscribeAsync();

Expand Down Expand Up @@ -246,7 +246,7 @@ void Log(string text)
proxy.Reset();

var opts = new NatsSubOpts { MaxMsgs = maxMsgs };
var sub = await nats.SubscribeInternalAsync<int>("foo3", opts: opts);
var sub = await nats.SubscribeCoreAsync<int>("foo3", opts: opts);
sid++;
var count = 0;
var reg = sub.Register(_ => Interlocked.Increment(ref count));
Expand Down
18 changes: 9 additions & 9 deletions tests/NATS.Client.Core.Tests/RequestReplyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public async Task Simple_request_reply_test()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cancellationToken = cts.Token;

var sub = await nats.SubscribeInternalAsync<int>(subject, cancellationToken: cancellationToken);
var sub = await nats.SubscribeCoreAsync<int>(subject, cancellationToken: cancellationToken);
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2, cancellationToken: cancellationToken);
Expand Down Expand Up @@ -74,7 +74,7 @@ public async Task Request_reply_many_test()
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeInternalAsync<int>("foo");
var sub = await nats.SubscribeCoreAsync<int>("foo");
var reg = sub.Register(async msg =>
{
for (var i = 0; i < msgs; i++)
Expand Down Expand Up @@ -106,7 +106,7 @@ public async Task Request_reply_many_test_overall_timeout()
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeInternalAsync<int>("foo");
var sub = await nats.SubscribeCoreAsync<int>("foo");
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2);
Expand Down Expand Up @@ -137,7 +137,7 @@ public async Task Request_reply_many_test_idle_timeout()
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeInternalAsync<int>("foo");
var sub = await nats.SubscribeCoreAsync<int>("foo");
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2);
Expand Down Expand Up @@ -171,7 +171,7 @@ public async Task Request_reply_many_test_start_up_timeout()
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeInternalAsync<int>("foo");
var sub = await nats.SubscribeCoreAsync<int>("foo");
var reg = sub.Register(async msg =>
{
await Task.Delay(2_000);
Expand Down Expand Up @@ -201,7 +201,7 @@ public async Task Request_reply_many_test_max_count()
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeInternalAsync<int>("foo");
var sub = await nats.SubscribeCoreAsync<int>("foo");
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2);
Expand Down Expand Up @@ -234,7 +234,7 @@ public async Task Request_reply_many_test_sentinel()
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeInternalAsync<int>("foo");
var sub = await nats.SubscribeCoreAsync<int>("foo");
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2);
Expand Down Expand Up @@ -269,7 +269,7 @@ static string ToStr(ReadOnlyMemory<byte> input)

await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();
await using var sub = await nats.SubscribeInternalAsync<string>("foo", cancellationToken: cts.Token);
await using var sub = await nats.SubscribeCoreAsync<string>("foo", cancellationToken: cts.Token);
var reg = sub.Register(async m =>
{
if (m.Data == "1")
Expand Down Expand Up @@ -304,7 +304,7 @@ public async Task Request_reply_many_multiple_with_timeout_test()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cancellationToken = cts.Token;

var sub = await nats.SubscribeInternalAsync<int>(subject, cancellationToken: cancellationToken);
var sub = await nats.SubscribeCoreAsync<int>(subject, cancellationToken: cancellationToken);
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2, cancellationToken: cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.Tests/SerializerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ await nats.PublishAsync(
});

// Check that our connection isn't affected by the exceptions
await using var sub = await nats.SubscribeInternalAsync<int>("foo");
await using var sub = await nats.SubscribeCoreAsync<int>("foo");

var rtt = await nats.PingAsync();
Assert.True(rtt > TimeSpan.Zero);
Expand Down
Loading

0 comments on commit c2ef9e9

Please sign in to comment.