Skip to content

Commit

Permalink
feat(bindings): mqtt bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
VisualBean committed Mar 25, 2024
1 parent 8d128db commit 544cfdf
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/LEGO.AsyncAPI.Bindings/BindingsCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace LEGO.AsyncAPI.Bindings
using LEGO.AsyncAPI.Bindings.AMQP;
using LEGO.AsyncAPI.Bindings.Http;
using LEGO.AsyncAPI.Bindings.Kafka;
using LEGO.AsyncAPI.Bindings.MQTT;
using LEGO.AsyncAPI.Bindings.Pulsar;
using LEGO.AsyncAPI.Bindings.Sns;
using LEGO.AsyncAPI.Bindings.Sqs;
Expand Down Expand Up @@ -53,6 +54,7 @@ public static TCollection Add<TCollection, TItem>(
Sqs,
Sns,
AMQP,
MQTT,
};

public static IEnumerable<IBindingParser<IBinding>> Http => new List<IBindingParser<IBinding>>
Expand Down Expand Up @@ -98,5 +100,12 @@ public static TCollection Add<TCollection, TItem>(
new AMQPOperationBinding(),
new AMQPMessageBinding(),
};

public static IEnumerable<IBindingParser<IBinding>> MQTT => new List<IBindingParser<IBinding>>
{
new MQTTServerBinding(),
new MQTTOperationBinding(),
new MQTTMessageBinding(),
};
}
}
48 changes: 48 additions & 0 deletions src/LEGO.AsyncAPI.Bindings/MQTT/LastWill.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Bindings.MQTT
{
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;
using System;

public class LastWill : IAsyncApiElement
{
/// <summary>
/// The topic where the Last Will and Testament message will be sent.
/// </summary>
public string Topic { get; set; }

/// <summary>
/// Defines how hard the broker/client will try to ensure that
/// the Last Will and Testament message is received.
/// Its value MUST be either 0, 1 or 2.
/// </summary>
public uint? QoS { get; set; }

/// <summary>
/// Last Will message.
/// </summary>
public string Message { get; set; }

/// <summary>
/// Whether the broker should retain the Last Will and Testament message or not.
/// </summary>
public bool Retain { get; set; }

public void Serialize(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();
writer.WriteRequiredProperty("topic", this.Topic);
writer.WriteOptionalProperty("qos", (int?)this.QoS);
writer.WriteOptionalProperty("message", this.Message);
writer.WriteRequiredProperty("retain", this.Retain);
writer.WriteEndObject();
}
}
}
65 changes: 65 additions & 0 deletions src/LEGO.AsyncAPI.Bindings/MQTT/MQTTMessageBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Bindings.MQTT
{
using System;
using LEGO.AsyncAPI.Models;
using LEGO.AsyncAPI.Readers;
using LEGO.AsyncAPI.Readers.ParseNodes;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for MQTT messages.
/// </summary>
public class MQTTMessageBinding : MessageBinding<MQTTMessageBinding>
{
/// <summary>
/// Indicates the format of the payload.
/// Either: 0 (zero) for unspecified bytes, or 1 for UTF-8 encoded character data.
/// </summary>
public int? PayloadFormatIndicator { get; set; }

/// <summary>
/// Correlation Data is used to identify the request the response message is for.
/// </summary>
public AsyncApiSchema CorrelationData { get; set; }

/// <summary>
/// String describing the content type of the message payload.
/// This should not conflict with the contentType field of the associated AsyncAPI Message object.
/// </summary>
public string ContentType { get; set; }

/// <summary>
/// The topic (channel URI) for a response message.
/// </summary>
public string ResponseTopic { get; set; }

public override void SerializeProperties(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();
writer.WriteOptionalProperty("payloadFormatIndicator", this.PayloadFormatIndicator);
writer.WriteOptionalObject("correlationData", this.CorrelationData, (w, h) => h.SerializeV2(w));
writer.WriteOptionalProperty("contentType", this.ContentType);
writer.WriteOptionalProperty("responseTopic", this.ResponseTopic);
writer.WriteOptionalProperty("bindingVersion", this.BindingVersion);
writer.WriteExtensions(this.Extensions);
writer.WriteEndObject();
}

public override string BindingKey => "mqtt";

protected override FixedFieldMap<MQTTMessageBinding> FixedFieldMap => new ()
{
{ "payloadFormatIndicator", (a, n) => { a.PayloadFormatIndicator = n.GetIntegerValueOrDefault(); } },
{ "correlationData", (a, n) => { a.CorrelationData = JsonSchemaDeserializer.LoadSchema(n); } },
{ "contentType", (a, n) => { a.ContentType = n.GetScalarValue(); } },
{ "responseTopic", (a, n) => { a.ResponseTopic = n.GetScalarValue(); } },
};
}
}
61 changes: 61 additions & 0 deletions src/LEGO.AsyncAPI.Bindings/MQTT/MQTTOperationBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Bindings.MQTT
{
using System;
using System.Collections.Generic;
using LEGO.AsyncAPI.Models;
using LEGO.AsyncAPI.Readers;
using LEGO.AsyncAPI.Readers.ParseNodes;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for MQTT operations.
/// </summary>
public class MQTTOperationBinding : OperationBinding<MQTTOperationBinding>
{
/// <summary>
/// Defines the Quality of Service (QoS) levels for the message flow between client and server.
/// Its value MUST be either 0 (At most once delivery), 1 (At least once delivery), or 2 (Exactly once delivery).
/// </summary>
public int QoS { get; set; }

/// <summary>
/// Whether the broker should retain the message or not.
/// </summary>
public bool Retain { get; set; }

/// <summary>
/// Interval in seconds or a Schema Object containing the definition of the lifetime of the message.
/// </summary>
public int? MessageExpiryInterval { get; set; }

public override string BindingKey => "mqtt";

protected override FixedFieldMap<MQTTOperationBinding> FixedFieldMap => new()
{
{ "qos", (a, n) => { a.QoS = n.GetIntegerValue(); } },
{ "retain", (a, n) => { a.Retain = n.GetBooleanValue(); } },
{ "messageExpiryInterval", (a, n) => { a.MessageExpiryInterval = n.GetIntegerValueOrDefault(); } },
};

/// <summary>
/// Serialize to AsyncAPI V2 document without using reference.
/// </summary>
public override void SerializeProperties(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();
writer.WriteRequiredProperty("qos", this.QoS);
writer.WriteRequiredProperty("retain", this.Retain);
writer.WriteOptionalProperty("messageExpiryInterval", this.MessageExpiryInterval);
writer.WriteOptionalProperty("bindingVersion", this.BindingVersion);
writer.WriteExtensions(this.Extensions);
writer.WriteEndObject();
}
}
}
93 changes: 93 additions & 0 deletions src/LEGO.AsyncAPI.Bindings/MQTT/MQTTServerBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Bindings.MQTT
{
using System;
using LEGO.AsyncAPI.Models;
using LEGO.AsyncAPI.Readers.ParseNodes;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for MQTT channel settings.
/// </summary>
public class MQTTServerBinding : ServerBinding<MQTTServerBinding>
{
/// <summary>
/// The client identifier.
/// </summary>
public string ClientId { get; set; }

/// <summary>
/// Whether to create a persistent connection or not.
/// When false, the connection will be persistent.
/// This is called clean start in MQTTv5.
/// </summary>
public bool? CleanSession { get; set; }

/// <summary>
/// Last Will and Testament configuration.
/// </summary>
public LastWill LastWill { get; set; }

/// <summary>
/// Interval in seconds of the longest period of time
/// the broker and the client can endure without sending a message.
/// </summary>
public int? KeepAlive { get; set; }

/// <summary>
/// Interval in seconds the broker maintains a session
/// for a disconnected client until this interval expires.
/// </summary>
public int? SessionExpiryInterval { get; set; }

/// <summary>
/// Number of bytes representing the maximum packet size
/// the client is willing to accept.
/// </summary>
public int? MaximumPacketSize { get; set; }

public override string BindingKey => "mqtt";

protected override FixedFieldMap<MQTTServerBinding> FixedFieldMap => new ()
{
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
{ "clientId", (a, n) => { a.ClientId = n.GetScalarValue(); } },
{ "cleanSession", (a, n) => { a.CleanSession = n.GetBooleanValueOrDefault(); } },
{ "lastWill", (a, n) => { a.LastWill = n.ParseMap(LastWillFixedFields); } },
{ "keepAlive", (a, n) => { a.KeepAlive = n.GetIntegerValueOrDefault(); } },
{ "sessionExpiryInterval", (a, n) => { a.SessionExpiryInterval = n.GetIntegerValueOrDefault(); } },
{ "maximumPacketSize", (a, n) => { a.MaximumPacketSize = n.GetIntegerValueOrDefault(); } },
};

private static FixedFieldMap<LastWill> LastWillFixedFields = new ()
{
{ "topic", (a, n) => { a.Topic = n.GetScalarValue(); } },
{ "qos", (a, n) => { a.QoS = (uint?)n.GetIntegerValueOrDefault(); } },
{ "message", (a, n) => { a.Message = n.GetScalarValue(); } },
{ "retain", (a, n) => { a.Retain = n.GetBooleanValue(); } },
};

/// <summary>
/// Serialize to AsyncAPI V2 document without using reference.
/// </summary>
public override void SerializeProperties(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();
writer.WriteRequiredProperty("clientId", this.ClientId);
writer.WriteOptionalProperty("cleanSession", this.CleanSession);
writer.WriteOptionalObject("lastWill", this.LastWill, (w, l) => l.Serialize(w));
writer.WriteOptionalProperty("keepAlive", this.KeepAlive);
writer.WriteOptionalProperty("sessionExpiryInterval", this.SessionExpiryInterval);
writer.WriteOptionalProperty("maximumPacketSize", this.MaximumPacketSize);
writer.WriteOptionalProperty(AsyncApiConstants.BindingVersion, this.BindingVersion);
writer.WriteExtensions(this.Extensions);
writer.WriteEndObject();
}
}
}
Loading

0 comments on commit 544cfdf

Please sign in to comment.