Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add avro logicaltypes #195

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 152 additions & 1 deletion src/LEGO.AsyncAPI.Readers/V2/AsyncApiAvroSchemaDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
namespace LEGO.AsyncAPI.Readers
{
using System;
using System.Threading;
using LEGO.AsyncAPI.Exceptions;
using LEGO.AsyncAPI.Models;
using LEGO.AsyncAPI.Models.Avro.LogicalTypes;
using LEGO.AsyncAPI.Readers.Exceptions;
using LEGO.AsyncAPI.Readers.ParseNodes;
using LEGO.AsyncAPI.Writers;
Expand Down Expand Up @@ -68,6 +70,60 @@ public class AsyncApiAvroSchemaDeserializer
{ "types", (a, n) => a.Types = n.CreateList(LoadSchema) },
};

private static readonly FixedFieldMap<AvroDecimal> DecimalFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
{ "precision", (a, n) => a.Precision = int.Parse(n.GetScalarValue()) },
{ "scale", (a, n) => a.Scale = int.Parse(n.GetScalarValue()) },
};

private static readonly FixedFieldMap<AvroUUID> UUIDFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroDate> DateFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroTimeMillis> TimeMillisFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroTimeMicros> TimeMicrosFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroTimestampMillis> TimestampMillisFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroTimestampMicros> TimestampMicrosFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroDuration> DurationFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
{ "name", (a, n) => a.Name = n.GetScalarValue() },
{ "namespace", (a, n) => a.Namespace = n.GetScalarValue() },
{ "aliases", (a, n) => a.Aliases = n.CreateSimpleList(n2 => n2.GetScalarValue()) },
{ "size", (a, n) => { } },
};

private static readonly PatternFieldMap<AvroRecord> RecordMetadataPatternFields =
new()
{
Expand Down Expand Up @@ -110,6 +166,54 @@ public class AsyncApiAvroSchemaDeserializer
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroDecimal> DecimalMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroUUID> UUIDMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroDate> DateMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroTimeMillis> TimeMillisMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroTimeMicros> TimeMicrosMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroTimestampMillis> TimestampMillisMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroTimestampMicros> TimestampMicrosMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroDuration> DurationMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

public static AvroSchema LoadSchema(ParseNode node)
{
if (node is ValueNode valueNode)
Expand Down Expand Up @@ -141,8 +245,13 @@ public static AvroSchema LoadSchema(ParseNode node)
};
}

var type = mapNode["type"]?.Value.GetScalarValue();
var isLogicalType = mapNode["logicalType"] != null;
if (isLogicalType)
{
return LoadLogicalType(mapNode);
}

var type = mapNode["type"]?.Value.GetScalarValue();
switch (type)
{
case "record":
Expand Down Expand Up @@ -177,6 +286,48 @@ public static AvroSchema LoadSchema(ParseNode node)
throw new AsyncApiReaderException("Invalid node type");
}

private static AvroSchema LoadLogicalType(MapNode mapNode)
{
var type = mapNode["logicalType"]?.Value.GetScalarValue();
switch (type)
{
case "decimal":
var @decimal = new AvroDecimal();
mapNode.ParseFields(ref @decimal, DecimalFixedFields, DecimalMetadataPatternFields);
return @decimal;
case "uuid":
var uuid = new AvroUUID();
mapNode.ParseFields(ref uuid, UUIDFixedFields, UUIDMetadataPatternFields);
return uuid;
case "date":
var date = new AvroDate();
mapNode.ParseFields(ref date, DateFixedFields, DateMetadataPatternFields);
return date;
case "time-millis":
var timeMillis = new AvroTimeMillis();
mapNode.ParseFields(ref timeMillis, TimeMillisFixedFields, TimeMillisMetadataPatternFields);
return timeMillis;
case "time-micros":
var timeMicros = new AvroTimeMicros();
mapNode.ParseFields(ref timeMicros, TimeMicrosFixedFields, TimeMicrosMetadataPatternFields);
return timeMicros;
case "timestamp-millis":
var timestampMillis = new AvroTimestampMillis();
mapNode.ParseFields(ref timestampMillis, TimestampMillisFixedFields, TimestampMillisMetadataPatternFields);
return timestampMillis;
case "timestamp-micros":
var timestampMicros = new AvroTimestampMicros();
mapNode.ParseFields(ref timestampMicros, TimestampMicrosFixedFields, TimestampMicrosMetadataPatternFields);
return timestampMicros;
case "duration":
var duration = new AvroDuration();
mapNode.ParseFields(ref duration, DurationFixedFields, DurationMetadataPatternFields);
return duration;
default:
throw new AsyncApiException($"Unsupported type: {type}");
}
}

private static AvroField LoadField(ParseNode node)
{
var mapNode = node.CheckMapNode("field");
Expand Down
1 change: 0 additions & 1 deletion src/LEGO.AsyncAPI/Models/Avro/AvroMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace LEGO.AsyncAPI.Models
{
using System;
using System.Collections.Generic;
using System.Linq;
using LEGO.AsyncAPI.Writers;
Expand Down
4 changes: 0 additions & 4 deletions src/LEGO.AsyncAPI/Models/Avro/AvroPrimitive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ public AvroPrimitive(AvroPrimitiveType type)
this.Type = type.GetDisplayName();
}

public AvroPrimitive()
{
}

public override void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
writer.WriteValue(this.Type);
Expand Down
14 changes: 14 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroDate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
public class AvroDate : AvroLogicalType
{
public AvroDate()
: base(AvroPrimitiveType.Int)
{
}

public override LogicalType LogicalType => LogicalType.Date;
}
}
26 changes: 26 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroDecimal.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
using LEGO.AsyncAPI.Writers;

public class AvroDecimal : AvroLogicalType
{
public AvroDecimal()
: base(AvroPrimitiveType.Bytes)
{
}

public override LogicalType LogicalType => LogicalType.Decimal;

public int? Scale { get; set; }

public int? Precision { get; set; }

public override void SerializeV2Core(IAsyncApiWriter writer)
{
writer.WriteOptionalProperty("scale", this.Scale);
writer.WriteOptionalProperty("precision", this.Precision);
}
}
}
42 changes: 42 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroDuration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
using System.Linq;
using LEGO.AsyncAPI.Writers;

public class AvroDuration : AvroFixed
{
public LogicalType LogicalType { get; } = LogicalType.Duration;

public new int Size { get; } = 12;

public override void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
writer.WriteStartObject();
writer.WriteOptionalProperty("type", this.Type);
writer.WriteOptionalProperty("logicalType", this.LogicalType.GetDisplayName());
writer.WriteRequiredProperty("name", this.Name);
writer.WriteOptionalProperty("namespace", this.Namespace);
writer.WriteOptionalCollection("aliases", this.Aliases, (w, s) => w.WriteValue(s));
writer.WriteRequiredProperty("size", this.Size);
if (this.Metadata.Any())
{
foreach (var item in this.Metadata)
{
writer.WritePropertyName(item.Key);
if (item.Value == null)
{
writer.WriteNull();
}
else
{
writer.WriteAny(item.Value);
}
}
}

writer.WriteEndObject();
}
}
}
48 changes: 48 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroLogicalType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
using System.Linq;
using LEGO.AsyncAPI.Writers;

public abstract class AvroLogicalType : AvroPrimitive
{
protected AvroLogicalType(AvroPrimitiveType type)
: base(type)
{
}

public abstract LogicalType LogicalType { get; }

public virtual void SerializeV2Core(IAsyncApiWriter writer)
{
}

public override void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
writer.WriteStartObject();
writer.WriteOptionalProperty("type", this.Type);
writer.WriteOptionalProperty("logicalType", this.LogicalType.GetDisplayName());

this.SerializeV2Core(writer);

if (this.Metadata.Any())
{
foreach (var item in this.Metadata)
{
writer.WritePropertyName(item.Key);
if (item.Value == null)
{
writer.WriteNull();
}
else
{
writer.WriteAny(item.Value);
}
}
}

writer.WriteEndObject();
}
}
}
14 changes: 14 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroTimeMicros.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
public class AvroTimeMicros : AvroLogicalType
{
public AvroTimeMicros()
: base(AvroPrimitiveType.Long)
{
}

public override LogicalType LogicalType => LogicalType.Time_Micros;
}
}
14 changes: 14 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroTimeMillis.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
public class AvroTimeMillis : AvroLogicalType
{
public AvroTimeMillis()
: base(AvroPrimitiveType.Int)
{
}

public override LogicalType LogicalType => LogicalType.Time_Millis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
public class AvroTimestampMicros : AvroLogicalType
{
public AvroTimestampMicros()
: base(AvroPrimitiveType.Long)
{
}

public override LogicalType LogicalType => LogicalType.Timestamp_Micros;
}
}
Loading
Loading