Skip to content

Commit

Permalink
JetStream enum PascalCase serializer (#225)
Browse files Browse the repository at this point in the history
* JetStream enum PascalCase serializer

System.Text.Json serializer has some difficult settings serializing enums.
Before we just relied on unconventional property names in code but proper
way is to use PascalCase. Unfortunately the JSON options also did not allow
snake_case conversion also I didn't want to force System.Text.Json v8 onto
net6.0 targets (if that supports it). Hence we now have a JetStream specific
JSON enum convertor.

* net8.0 supports snake_case enums
  • Loading branch information
mtmk authored Nov 21, 2023
1 parent a234754 commit fd88d9e
Show file tree
Hide file tree
Showing 27 changed files with 464 additions and 137 deletions.
3 changes: 2 additions & 1 deletion NATS.Client.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
<s:Boolean x:Key="/Default/UserDictionary/Words/=HMSG/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HPUB/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Msgs/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Nuid/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Nuid/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Workqueue/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
2 changes: 1 addition & 1 deletion sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

var js = new NatsJSContext(nats);

var consumer = await js.CreateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.@explicit });
var consumer = await js.CreateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });

var idle = TimeSpan.FromSeconds(5);
var expires = TimeSpan.FromSeconds(10);
Expand Down
232 changes: 232 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using NATS.Client.Core;
using NATS.Client.JetStream.Models;
Expand All @@ -6,7 +7,23 @@ namespace NATS.Client.JetStream.Internal;

internal static class NatsJSJsonSerializer<T>
{
#if NET6_0
public static readonly INatsSerializer<T> Default = new NatsJsonContextSerializer<T>(NatsJSJsonSerializerContext.Default);
#else
public static readonly INatsSerializer<T> Default = new NatsJsonContextSerializer<T>(new NatsJSJsonSerializerContext(new JsonSerializerOptions
{
Converters =
{
new JsonStringEnumConverter<ConsumerConfigDeliverPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerConfigAckPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerConfigReplayPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigCompression>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigDiscard>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigRetention>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigStorage>(JsonNamingPolicy.SnakeCaseLower),
},
}));
#endif
}

[JsonSerializable(typeof(AccountInfoResponse))]
Expand Down Expand Up @@ -86,3 +103,218 @@ internal static class NatsJSJsonSerializer<T>
internal partial class NatsJSJsonSerializerContext : JsonSerializerContext
{
}

#if NET6_0
internal class NatsJSJsonStringEnumConverter<TEnum> : JsonConverter<TEnum>
where TEnum : struct, Enum
{
public override TEnum Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
{
throw new InvalidOperationException();
}

var stringValue = reader.GetString();

if (typeToConvert == typeof(ConsumerConfigDeliverPolicy))
{
switch (stringValue)
{
case "all":
return (TEnum)(object)ConsumerConfigDeliverPolicy.All;
case "last":
return (TEnum)(object)ConsumerConfigDeliverPolicy.Last;
case "new":
return (TEnum)(object)ConsumerConfigDeliverPolicy.New;
case "by_start_sequence":
return (TEnum)(object)ConsumerConfigDeliverPolicy.ByStartSequence;
case "by_start_time":
return (TEnum)(object)ConsumerConfigDeliverPolicy.ByStartTime;
case "last_per_subject":
return (TEnum)(object)ConsumerConfigDeliverPolicy.LastPerSubject;
}
}

if (typeToConvert == typeof(ConsumerConfigAckPolicy))
{
switch (stringValue)
{
case "none":
return (TEnum)(object)ConsumerConfigAckPolicy.None;
case "all":
return (TEnum)(object)ConsumerConfigAckPolicy.All;
case "explicit":
return (TEnum)(object)ConsumerConfigAckPolicy.Explicit;
}
}

if (typeToConvert == typeof(ConsumerConfigReplayPolicy))
{
switch (stringValue)
{
case "instant":
return (TEnum)(object)ConsumerConfigReplayPolicy.Instant;
case "original":
return (TEnum)(object)ConsumerConfigReplayPolicy.Original;
}
}

if (typeToConvert == typeof(StreamConfigCompression))
{
switch (stringValue)
{
case "none":
return (TEnum)(object)StreamConfigCompression.None;
case "s2":
return (TEnum)(object)StreamConfigCompression.S2;
}
}

if (typeToConvert == typeof(StreamConfigDiscard))
{
switch (stringValue)
{
case "old":
return (TEnum)(object)StreamConfigDiscard.Old;
case "new":
return (TEnum)(object)StreamConfigDiscard.New;
}
}

if (typeToConvert == typeof(StreamConfigRetention))
{
switch (stringValue)
{
case "limits":
return (TEnum)(object)StreamConfigRetention.Limits;
case "interest":
return (TEnum)(object)StreamConfigRetention.Interest;
case "workqueue":
return (TEnum)(object)StreamConfigRetention.Workqueue;
}
}

if (typeToConvert == typeof(StreamConfigStorage))
{
switch (stringValue)
{
case "file":
return (TEnum)(object)StreamConfigStorage.File;
case "memory":
return (TEnum)(object)StreamConfigStorage.Memory;
}
}

throw new InvalidOperationException($"Reading unknown enum type {typeToConvert.Name} or value {stringValue}");
}

public override void Write(Utf8JsonWriter writer, TEnum value, JsonSerializerOptions options)
{
if (value is ConsumerConfigDeliverPolicy consumerConfigDeliverPolicy)
{
switch (consumerConfigDeliverPolicy)
{
case ConsumerConfigDeliverPolicy.All:
writer.WriteStringValue("all");
return;
case ConsumerConfigDeliverPolicy.Last:
writer.WriteStringValue("last");
return;
case ConsumerConfigDeliverPolicy.New:
writer.WriteStringValue("new");
return;
case ConsumerConfigDeliverPolicy.ByStartSequence:
writer.WriteStringValue("by_start_sequence");
return;
case ConsumerConfigDeliverPolicy.ByStartTime:
writer.WriteStringValue("by_start_time");
return;
case ConsumerConfigDeliverPolicy.LastPerSubject:
writer.WriteStringValue("last_per_subject");
return;
}
}
else if (value is ConsumerConfigAckPolicy consumerConfigAckPolicy)
{
switch (consumerConfigAckPolicy)
{
case ConsumerConfigAckPolicy.None:
writer.WriteStringValue("none");
return;
case ConsumerConfigAckPolicy.All:
writer.WriteStringValue("all");
return;
case ConsumerConfigAckPolicy.Explicit:
writer.WriteStringValue("explicit");
return;
}
}
else if (value is ConsumerConfigReplayPolicy consumerConfigReplayPolicy)
{
switch (consumerConfigReplayPolicy)
{
case ConsumerConfigReplayPolicy.Instant:
writer.WriteStringValue("instant");
return;
case ConsumerConfigReplayPolicy.Original:
writer.WriteStringValue("original");
return;
}
}
else if (value is StreamConfigCompression streamConfigCompression)
{
switch (streamConfigCompression)
{
case StreamConfigCompression.None:
writer.WriteStringValue("none");
return;
case StreamConfigCompression.S2:
writer.WriteStringValue("s2");
return;
}
}
else if (value is StreamConfigDiscard streamConfigDiscard)
{
switch (streamConfigDiscard)
{
case StreamConfigDiscard.Old:
writer.WriteStringValue("old");
return;
case StreamConfigDiscard.New:
writer.WriteStringValue("new");
return;
}
}
else if (value is StreamConfigRetention streamConfigRetention)
{
switch (streamConfigRetention)
{
case StreamConfigRetention.Limits:
writer.WriteStringValue("limits");
return;
case StreamConfigRetention.Interest:
writer.WriteStringValue("interest");
return;
case StreamConfigRetention.Workqueue:
writer.WriteStringValue("workqueue");
return;
}
}
else if (value is StreamConfigStorage streamConfigStorage)
{
switch (streamConfigStorage)
{
case StreamConfigStorage.File:
writer.WriteStringValue("file");
return;
case StreamConfigStorage.Memory:
writer.WriteStringValue("memory");
return;
}
}

throw new InvalidOperationException($"Writing unknown enum value {value.GetType().Name}.{value}");
}
}
#endif
10 changes: 5 additions & 5 deletions src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal record NatsJSOrderedPushConsumerOpts
/// </summary>
public TimeSpan IdleHeartbeat { get; init; } = TimeSpan.FromSeconds(5);

public ConsumerConfigDeliverPolicy DeliverPolicy { get; init; } = ConsumerConfigDeliverPolicy.all;
public ConsumerConfigDeliverPolicy DeliverPolicy { get; init; } = ConsumerConfigDeliverPolicy.All;

public bool HeadersOnly { get; init; } = false;
}
Expand Down Expand Up @@ -338,8 +338,8 @@ private async ValueTask CreatePushConsumer(string origin)
var config = new ConsumerConfig
{
Name = Consumer,
DeliverPolicy = ConsumerConfigDeliverPolicy.all,
AckPolicy = ConsumerConfigAckPolicy.none,
DeliverPolicy = ConsumerConfigDeliverPolicy.All,
AckPolicy = ConsumerConfigAckPolicy.None,
DeliverSubject = _sub.Subject,
FilterSubject = _filter,
FlowControl = true,
Expand All @@ -348,15 +348,15 @@ private async ValueTask CreatePushConsumer(string origin)
MaxDeliver = 1,
MemStorage = true,
NumReplicas = 1,
ReplayPolicy = ConsumerConfigReplayPolicy.instant,
ReplayPolicy = ConsumerConfigReplayPolicy.Instant,
};

config.DeliverPolicy = _opts.DeliverPolicy;
config.HeadersOnly = _opts.HeadersOnly;

if (sequence > 0)
{
config.DeliverPolicy = ConsumerConfigDeliverPolicy.by_start_sequence;
config.DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence;
config.OptStartSeq = sequence + 1;
}

Expand Down
20 changes: 8 additions & 12 deletions src/NATS.Client.JetStream/Models/ConsumerConfig.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using NATS.Client.JetStream.Internal;

namespace NATS.Client.JetStream.Models;

public record ConsumerConfig
Expand All @@ -10,16 +12,14 @@ public ConsumerConfig(string name)
{
Name = name;
DurableName = name;
AckPolicy = ConsumerConfigAckPolicy.@explicit;
AckPolicy = ConsumerConfigAckPolicy.Explicit;
}

[System.Text.Json.Serialization.JsonPropertyName("deliver_policy")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
#if NET6_0
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
#else
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter<ConsumerConfigDeliverPolicy>))]
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonStringEnumConverter<ConsumerConfigDeliverPolicy>))]
#endif
public ConsumerConfigDeliverPolicy DeliverPolicy { get; set; } = default!;

Expand Down Expand Up @@ -67,11 +67,9 @@ public ConsumerConfig(string name)
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
#if NET6_0
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
#else
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter<ConsumerConfigAckPolicy>))]
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonStringEnumConverter<ConsumerConfigAckPolicy>))]
#endif
public ConsumerConfigAckPolicy AckPolicy { get; set; } = ConsumerConfigAckPolicy.none;
public ConsumerConfigAckPolicy AckPolicy { get; set; } = ConsumerConfigAckPolicy.None;

/// <summary>
/// How long (in nanoseconds) to allow messages to remain un-acknowledged before attempting redelivery
Expand Down Expand Up @@ -107,11 +105,9 @@ public ConsumerConfig(string name)
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
#if NET6_0
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
#else
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter<ConsumerConfigReplayPolicy>))]
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonStringEnumConverter<ConsumerConfigReplayPolicy>))]
#endif
public ConsumerConfigReplayPolicy ReplayPolicy { get; set; } = NATS.Client.JetStream.Models.ConsumerConfigReplayPolicy.instant;
public ConsumerConfigReplayPolicy ReplayPolicy { get; set; } = NATS.Client.JetStream.Models.ConsumerConfigReplayPolicy.Instant;

[System.Text.Json.Serialization.JsonPropertyName("sample_freq")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
Expand Down
13 changes: 3 additions & 10 deletions src/NATS.Client.JetStream/Models/ConsumerConfigAckPolicy.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
namespace NATS.Client.JetStream.Models;

// TODO: enum member naming with JSON serialization isn't working for some reason
#pragma warning disable SA1300
public enum ConsumerConfigAckPolicy
{
[System.Runtime.Serialization.EnumMember(Value = @"none")]
none = 0,

[System.Runtime.Serialization.EnumMember(Value = @"all")]
all = 1,

[System.Runtime.Serialization.EnumMember(Value = @"explicit")]
@explicit = 2,
None = 0,
All = 1,
Explicit = 2,
}
Loading

0 comments on commit fd88d9e

Please sign in to comment.