Skip to content

Commit

Permalink
Services tests fix an tidy-up (#189)
Browse files Browse the repository at this point in the history
Issue with FindServices() call was also happening in native AOT checks.
  • Loading branch information
mtmk authored Nov 7, 2023
1 parent 81634ec commit 4a7fd02
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 81 deletions.
35 changes: 5 additions & 30 deletions tests/NATS.Client.CheckNativeAot/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Text;
using System.Text.Json.Nodes;
using NATS.Client.Core;
using NATS.Client.Core.Tests;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
Expand Down Expand Up @@ -317,7 +316,7 @@ await grp2.AddEndpointAsync<int>(

// Check that the endpoints are registered correctly
{
var info = (await FindServices<InfoResponse>(nats, "$SRV.INFO.s1", 1, cancellationToken)).First();
var info = (await nats.FindServicesAsync("$SRV.INFO.s1", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken)).First();
Assert.Equal(5, info.Endpoints.Count);

Assert.Equal("foo.baz", info.Endpoints.First(e => e.Name == "baz").Subject);
Expand Down Expand Up @@ -355,7 +354,7 @@ await s2.AddEndpointAsync<int>(

// Check default queue group and stats handler
{
var info = (await FindServices<InfoResponse>(nats, "$SRV.INFO.s2", 1, cancellationToken)).First();
var info = (await nats.FindServicesAsync("$SRV.INFO.s2", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken)).First();
Assert.Single(info.Endpoints);
var epi = info.Endpoints.First();

Expand All @@ -364,7 +363,7 @@ await s2.AddEndpointAsync<int>(
Assert.Equal("q2", epi.QueueGroup);
Assert.Equal("ep-v1", epi.Metadata["ep-k1"]);

var stat = (await FindServices<StatsResponse>(nats, "$SRV.STATS.s2", 1, cancellationToken)).First();
var stat = (await nats.FindServicesAsync("$SRV.STATS.s2", 1, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken)).First();
Assert.Equal("v1", stat.Metadata["k1"]);
Assert.Equal("v2", stat.Metadata["k2"]);
Assert.Single(stat.Endpoints);
Expand Down Expand Up @@ -414,7 +413,7 @@ await s1.AddEndpointAsync<int>(
},
cancellationToken: cancellationToken);

var info = (await FindServices<InfoResponse>(nats, "$SRV.INFO", 1, cancellationToken)).First();
var info = (await nats.FindServicesAsync("$SRV.INFO", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken)).First();
Assert.Single(info.Endpoints);
var endpointInfo = info.Endpoints.First();
Assert.Equal("e1", endpointInfo.Name);
Expand All @@ -439,7 +438,7 @@ await s1.AddEndpointAsync<int>(
}
}

var stat = (await FindServices<StatsResponse>(nats, "$SRV.STATS", 1, cancellationToken)).First();
var stat = (await nats.FindServicesAsync("$SRV.STATS", 1, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken)).First();
Assert.Single(stat.Endpoints);
var endpointStats = stat.Endpoints.First();
Assert.Equal("e1", endpointStats.Name);
Expand All @@ -451,27 +450,3 @@ await s1.AddEndpointAsync<int>(

Log("OK");
}

static async Task<List<T>> FindServices<T>(NatsConnection nats, string subject, int limit, CancellationToken ct)
{
var replyOpts = new NatsSubOpts
{
Timeout = TimeSpan.FromSeconds(2),
};
var responses = new List<T>();

var count = 0;
await foreach (var msg in nats.RequestManyAsync<object?, T>(subject, null, replySerializer: NatsSrvJsonSerializer<T>.Default, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))
{
responses.Add(msg.Data!);
if (++count == limit)
break;
}

if (count != limit)
{
throw new Exception($"Find service error: Expected {limit} responses but got {count}");
}

return responses;
}
65 changes: 14 additions & 51 deletions tests/NATS.Client.Services.Tests/ServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public async Task Add_service_listeners_ping_info_and_stats()

await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken);

var pingsTask = FindServices<PingResponse>(nats, "$SRV.PING", 1, cancellationToken);
var infosTask = FindServices<InfoResponse>(nats, "$SRV.INFO", 1, cancellationToken);
var statsTask = FindServices<StatsResponse>(nats, "$SRV.STATS", 1, cancellationToken);
var pingsTask = nats.FindServicesAsync("$SRV.PING", 1, NatsSrvJsonSerializer<PingResponse>.Default, cancellationToken);
var infosTask = nats.FindServicesAsync("$SRV.INFO", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken);
var statsTask = nats.FindServicesAsync("$SRV.STATS", 1, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken);

var pings = await pingsTask;
pings.ForEach(x => _output.WriteLine($"{x}"));
Expand Down Expand Up @@ -82,7 +82,7 @@ await s1.AddEndpointAsync<int>(
},
cancellationToken: cancellationToken);

var info = (await FindServices<InfoResponse>(nats, "$SRV.INFO", 1, cancellationToken)).First();
var info = (await nats.FindServicesAsync("$SRV.INFO", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken)).First();
Assert.Single(info.Endpoints);
var endpointInfo = info.Endpoints.First();
Assert.Equal("e1", endpointInfo.Name);
Expand All @@ -107,7 +107,7 @@ await s1.AddEndpointAsync<int>(
}
}

var stat = (await FindServices<StatsResponse>(nats, "$SRV.STATS", 1, cancellationToken)).First();
var stat = (await nats.FindServicesAsync("$SRV.STATS", 1, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken)).First();
Assert.Single(stat.Endpoints);
var endpointStats = stat.Endpoints.First();
Assert.Equal("e1", endpointStats.Name);
Expand All @@ -133,40 +133,39 @@ public async Task Add_groups_metadata_and_stats()
await s1.AddEndpointAsync<int>(
name: "baz",
subject: "foo.baz",
handler: m => ValueTask.CompletedTask,
handler: _ => ValueTask.CompletedTask,
cancellationToken: cancellationToken);

await s1.AddEndpointAsync<int>(
subject: "foo.bar1",
handler: m => ValueTask.CompletedTask,
handler: _ => ValueTask.CompletedTask,
cancellationToken: cancellationToken);

var grp1 = await s1.AddGroupAsync("grp1", cancellationToken: cancellationToken);

await grp1.AddEndpointAsync<int>(
name: "e1",
handler: m => ValueTask.CompletedTask,
handler: _ => ValueTask.CompletedTask,
cancellationToken: cancellationToken);

await grp1.AddEndpointAsync<int>(
name: "e2",
subject: "foo.bar2",
handler: m => ValueTask.CompletedTask,
handler: _ => ValueTask.CompletedTask,
cancellationToken: cancellationToken);

var grp2 = await s1.AddGroupAsync(string.Empty, queueGroup: "q_empty", cancellationToken: cancellationToken);

await grp2.AddEndpointAsync<int>(
name: "empty1",
subject: "foo.empty1",
handler: m => ValueTask.CompletedTask,
handler: _ => ValueTask.CompletedTask,
cancellationToken: cancellationToken);

// Check that the endpoints are registered correctly
{
var info = (await FindServices<InfoResponse>(nats, "$SRV.INFO.s1", 1, cancellationToken)).First();
var info = (await nats.FindServicesAsync("$SRV.INFO.s1", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken)).First();
Assert.Equal(5, info.Endpoints.Count);
var endpoints = info.Endpoints.ToList();

Assert.Equal("foo.baz", info.Endpoints.First(e => e.Name == "baz").Subject);
Assert.Equal("q", info.Endpoints.First(e => e.Name == "baz").QueueGroup);
Expand Down Expand Up @@ -197,13 +196,13 @@ await grp2.AddEndpointAsync<int>(
await s2.AddEndpointAsync<int>(
name: "s2baz",
subject: "s2foo.baz",
handler: m => ValueTask.CompletedTask,
handler: _ => ValueTask.CompletedTask,
metadata: new Dictionary<string, string> { { "ep-k1", "ep-v1" } },
cancellationToken: cancellationToken);

// Check default queue group and stats handler
{
var info = (await FindServices<InfoResponse>(nats, "$SRV.INFO.s2", 1, cancellationToken)).First();
var info = (await nats.FindServicesAsync("$SRV.INFO.s2", 1, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken)).First();
Assert.Single(info.Endpoints);
var epi = info.Endpoints.First();

Expand All @@ -212,7 +211,7 @@ await s2.AddEndpointAsync<int>(
Assert.Equal("q2", epi.QueueGroup);
Assert.Equal("ep-v1", epi.Metadata["ep-k1"]);

var stat = (await FindServices<StatsResponse>(nats, "$SRV.STATS.s2", 1, cancellationToken)).First();
var stat = (await nats.FindServicesAsync("$SRV.STATS.s2", 1, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken)).First();
Assert.Equal("v1", stat.Metadata["k1"]);
Assert.Equal("v2", stat.Metadata["k2"]);
Assert.Single(stat.Endpoints);
Expand All @@ -222,40 +221,4 @@ await s2.AddEndpointAsync<int>(
Assert.Equal("s2baz", eps.Data["ep_name"]?.GetValue<string>());
}
}

private static async Task<List<T>> FindServices<T>(NatsConnection nats, string subject, int limit, CancellationToken ct)
{
var replyOpts = new NatsSubOpts
{
Timeout = TimeSpan.FromSeconds(2),
};
var responses = new List<T>();

await Retry.Until("service is found", async () =>
{
var count = 0;
await foreach (var msg in nats.RequestManyAsync<object?, T>(subject, null, replySerializer: NatsSrvJsonSerializer<T>.Default, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))
{
if (++count == limit)
break;
}

return count == limit;
});

var count = 0;
await foreach (var msg in nats.RequestManyAsync<object?, T>(subject, null, replySerializer: NatsSrvJsonSerializer<T>.Default, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))
{
responses.Add(msg.Data!);
if (++count == limit)
break;
}

if (count != limit)
{
throw new Exception($"Find service error: Expected {limit} responses but got {count}");
}

return responses;
}
}
39 changes: 39 additions & 0 deletions tests/NATS.Client.TestUtilities/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,42 @@ public static string Dump(this in ReadOnlySpan<byte> span)
return sb.ToString();
}
}

public static class ServiceUtils
{
public static async Task<List<T>> FindServicesAsync<T>(this NatsConnection nats, string subject, int limit, INatsDeserialize<T> serializer, CancellationToken ct)
{
var replyOpts = new NatsSubOpts
{
Timeout = TimeSpan.FromSeconds(2),
};
var responses = new List<T>();

await Retry.Until("service is found", async () =>
{
var count = 0;
await foreach (var msg in nats.RequestManyAsync<object?, T>(subject, null, replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))
{
if (++count == limit)
break;
}

return count == limit;
});

var count = 0;
await foreach (var msg in nats.RequestManyAsync<object?, T>(subject, null, replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false))
{
responses.Add(msg.Data!);
if (++count == limit)
break;
}

if (count != limit)
{
throw new Exception($"Find service error: Expected {limit} responses but got {count}");
}

return responses;
}
}

0 comments on commit 4a7fd02

Please sign in to comment.