Skip to content

Commit

Permalink
AOT compatibility
Browse files Browse the repository at this point in the history
This replaces `IsTrimmable` with `IsAotCompatible` which contains better analyzers. As a result, the deserialize and consume logic has been updated to handle specifics such as generics that cannot be inferred in AOT.

Initial support for trimming in #564 did not have support for JSON serializer. Using the slim event bus meant you needed to create your own serializer and register it as a default. With this PR, the `DefaultJsonEventSerializer` support the use of a `JsonSerializerContext` where each event has been declared as `EventEnvelope<TEvent>`.

Consequently, registration of consumers requires explicit events as shown in the sample
  • Loading branch information
mburumaxwell committed Jun 6, 2024
1 parent 09cdcf7 commit 1d1bfff
Show file tree
Hide file tree
Showing 49 changed files with 1,000 additions and 582 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<PropertyGroup>
<WarningsAsErrors>$(WarningsAsErrors),IL2026</WarningsAsErrors>
<WarningsAsErrors>$(WarningsAsErrors),IL2026,IL2060,IL2091,IL2095,IL3050</WarningsAsErrors>
</PropertyGroup>

</Project>
7 changes: 7 additions & 0 deletions Tingle.EventBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{62F6
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AmazonSqsAndSns", "samples\AmazonSqsAndSns\AmazonSqsAndSns.csproj", "{C369A8E1-F29D-4705-BD38-28C3DE80D8DB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AotSupport", "samples\AotSupport\AotSupport.csproj", "{63002328-9833-4FF3-9CE8-9134771E2455}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureIotHub", "samples\AzureIotHub\AzureIotHub.csproj", "{3759B206-BF8D-4E46-9B04-1C19F156D295}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureManagedIdentity", "samples\AzureManagedIdentity\AzureManagedIdentity.csproj", "{A9AA8DC8-F463-4BB2-AD7B-59060C758862}"
Expand Down Expand Up @@ -158,6 +160,10 @@ Global
{C369A8E1-F29D-4705-BD38-28C3DE80D8DB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C369A8E1-F29D-4705-BD38-28C3DE80D8DB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C369A8E1-F29D-4705-BD38-28C3DE80D8DB}.Release|Any CPU.Build.0 = Release|Any CPU
{63002328-9833-4FF3-9CE8-9134771E2455}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{63002328-9833-4FF3-9CE8-9134771E2455}.Debug|Any CPU.Build.0 = Debug|Any CPU
{63002328-9833-4FF3-9CE8-9134771E2455}.Release|Any CPU.ActiveCfg = Release|Any CPU
{63002328-9833-4FF3-9CE8-9134771E2455}.Release|Any CPU.Build.0 = Release|Any CPU
{3759B206-BF8D-4E46-9B04-1C19F156D295}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3759B206-BF8D-4E46-9B04-1C19F156D295}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3759B206-BF8D-4E46-9B04-1C19F156D295}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -233,6 +239,7 @@ Global
{E4D62A60-39E4-401E-B146-0EA8DA272664} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B}
{F827E66C-53DE-4366-A552-0320B3563294} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B}
{C369A8E1-F29D-4705-BD38-28C3DE80D8DB} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{63002328-9833-4FF3-9CE8-9134771E2455} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{3759B206-BF8D-4E46-9B04-1C19F156D295} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{A9AA8DC8-F463-4BB2-AD7B-59060C758862} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{8E115759-87CC-4F45-9679-A9EBBD59992B} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
Expand Down
12 changes: 12 additions & 0 deletions samples/AotSupport/AotSupport.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<PublishAot>true</PublishAot>
<InvariantGlobalization>true</InvariantGlobalization>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Tingle.EventBus.Transports.InMemory\Tingle.EventBus.Transports.InMemory.csproj" />
</ItemGroup>

</Project>
88 changes: 88 additions & 0 deletions samples/AotSupport/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System.Text.Json.Serialization;
using Tingle.EventBus.Serialization;

var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddSlimEventBus(CustomSrializerContext.Default, builder =>
{
builder.AddConsumer<VideoUploaded, VideoUploadedConsumer>();
builder.AddDeadLetteredConsumer<VideoUploaded, VideoUploadedConsumer>();

builder.AddInMemoryTransport();
});

services.AddHostedService<ProducerService>();
})
.Build();

await host.RunAsync();

class ProducerService(IEventPublisher publisher) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var delay = TimeSpan.FromSeconds(25);
var times = 5;

var rnd = new Random(DateTimeOffset.UtcNow.Millisecond);

for (var i = 0; i < times; i++)
{
var evt = new VideoUploaded
{
VideoId = Convert.ToUInt32(rnd.Next()).ToString(),
SizeBytes = Convert.ToUInt32(rnd.Next()),
};

evt.Url = $"https://localhost:8080/uploads/raw/{evt.VideoId}.flv";

await publisher.PublishAsync(evt, cancellationToken: stoppingToken);

await Task.Delay(delay, stoppingToken);
}
}
}

class VideoUploadedConsumer(ILogger<VideoUploadedConsumer> logger) : IEventConsumer<VideoUploaded>, IDeadLetteredEventConsumer<VideoUploaded>
{
private static readonly TimeSpan SimulationDuration = TimeSpan.FromSeconds(3);

public async Task ConsumeAsync(EventContext<VideoUploaded> context, CancellationToken cancellationToken = default)
{
var evt = context.Event;
var videoId = evt.VideoId;
logger.LogInformation("Received event Id: {Id} for video '{VideoId}'.", context.Id, videoId);

// Download video locally
logger.LogInformation("Downloading video from {VideoUrl} ({VideoSize} bytes).", evt.Url, evt.SizeBytes);
await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay

// Extract thumbnail from video
logger.LogInformation("Extracting thumbnail from video with Id '{VideoId}'.", videoId);
await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay

// Upload video thumbnail
var thumbnailUrl = $"https://localhost:8080/uploads/thumbnails/{videoId}.jpg";
logger.LogInformation("Uploading thumbnail for video with Id '{VideoId}' to '{ThumbnailUrl}'.", videoId, thumbnailUrl);
await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay

logger.LogInformation("Processing video with Id '{VideoId}' completed.", videoId);
}

public Task ConsumeAsync(DeadLetteredEventContext<VideoUploaded> context, CancellationToken cancellationToken)
{
logger.LogWarning("Event with Id '{Id}' for video '{VideoId}' was dead-lettered.", context.Id, context.Event.VideoId);
return Task.CompletedTask;
}
}

class VideoUploaded
{
public string? VideoId { get; set; }
public string? Url { get; set; }
public long SizeBytes { get; set; }
}

[JsonSerializable(typeof(EventEnvelope<VideoUploaded>))]
partial class CustomSrializerContext : JsonSerializerContext { }
10 changes: 10 additions & 0 deletions samples/AotSupport/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"profiles": {
"AotSupport": {
"commandName": "Project",
"environmentVariables": {
"DOTNET_ENVIRONMENT": "Development"
}
}
}
}
16 changes: 16 additions & 0 deletions samples/AotSupport/appsettings.Development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Information",
"System": "Information"
},
"Console": {
"FormatterName": "simple",
"FormatterOptions": {
"SingleLine": true,
"TimestampFormat": "HH:mm:ss "
}
}
}
}
9 changes: 9 additions & 0 deletions samples/AotSupport/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<Company>Tingle Software</Company>
<Authors>Tingle Software</Authors>
<PackageTags>EventBus</PackageTags>
<IsTrimmable Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'net6.0'))">true</IsTrimmable>
<IsAotCompatible>true</IsAotCompatible>
</PropertyGroup>

<PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Description>A serializer implementation for Tingle.EventBus using Newtonsoft.Json</Description>
<PackageTags>$(PackageTags);Serializers;Newtonsoft</PackageTags>
<RootNamespace>Tingle.EventBus.Serializers</RootNamespace>
<IsTrimmable>false</IsTrimmable>
<IsAotCompatible>false</IsAotCompatible>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Diagnostics.CodeAnalysis;
using Tingle.EventBus.Configuration;
using Tingle.EventBus.Internal;

namespace Tingle.EventBus.Transports.Amazon.Kinesis;

Expand Down Expand Up @@ -62,10 +64,10 @@ protected override Task StopCoreAsync(CancellationToken cancellationToken)
}

/// <inheritdoc/>
protected override async Task<ScheduledResult?> PublishCoreAsync<TEvent>(EventContext<TEvent> @event,
EventRegistration registration,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
protected override async Task<ScheduledResult?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext<TEvent> @event,
EventRegistration registration,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
// log warning when trying to publish scheduled message
if (scheduled != null)
Expand Down Expand Up @@ -98,10 +100,10 @@ protected override Task StopCoreAsync(CancellationToken cancellationToken)
}

/// <inheritdoc/>
protected override async Task<IList<ScheduledResult>?> PublishCoreAsync<TEvent>(IList<EventContext<TEvent>> events,
EventRegistration registration,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
protected override async Task<IList<ScheduledResult>?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList<EventContext<TEvent>> events,
EventRegistration registration,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
// log warning when trying to publish scheduled message
if (scheduled != null)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Tingle.EventBus;
using System.Diagnostics.CodeAnalysis;
using Tingle.EventBus;
using Tingle.EventBus.Internal;

namespace Microsoft.Extensions.Logging;

Expand All @@ -22,7 +24,7 @@ public static void SendingEventsToStream(this ILogger logger, IList<string?> eve
eventBusIds: string.Join("\r\n- ", eventBusIds));
}

public static void SendingEventsToStream<T>(this ILogger logger, IList<EventContext<T>> events, string entityPath, DateTimeOffset? scheduled = null)
public static void SendingEventsToStream<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(this ILogger logger, IList<EventContext<T>> events, string entityPath, DateTimeOffset? scheduled = null)
where T : class
{
if (!logger.IsEnabled(LogLevel.Information)) return;
Expand Down
56 changes: 21 additions & 35 deletions src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
}

/// <inheritdoc/>
protected override async Task<ScheduledResult?> PublishCoreAsync<TEvent>(EventContext<TEvent> @event,
EventRegistration registration,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
protected override async Task<ScheduledResult?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext<TEvent> @event,
EventRegistration registration,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
Expand Down Expand Up @@ -154,10 +154,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
}

/// <inheritdoc/>
protected override async Task<IList<ScheduledResult>?> PublishCoreAsync<TEvent>(IList<EventContext<TEvent>> events,
EventRegistration registration,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
protected override async Task<IList<ScheduledResult>?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList<EventContext<TEvent>> events,
EventRegistration registration,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
using var scope = CreateScope();
var sequenceNumbers = new List<string>();
Expand Down Expand Up @@ -350,10 +350,6 @@ private async Task<string> CreateQueueIfNotExistsAsync(string queueName, EventRe

private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration ecr, string queueUrl, CancellationToken cancellationToken)
{
var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic;
var mt = GetType().GetMethod(nameof(OnMessageReceivedAsync), flags) ?? throw new InvalidOperationException("Methods should be null");
var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType);

while (!cancellationToken.IsCancellationRequested)
{
try
Expand All @@ -375,7 +371,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration
using var scope = CreateScope(); // shared
foreach (var message in messages)
{
await ((Task)method.Invoke(this, [reg, ecr, queueUrl, message, cancellationToken])!).ConfigureAwait(false);
await OnMessageReceivedAsync(reg, ecr, queueUrl, message, cancellationToken).ConfigureAwait(false);
}
}
}
Expand All @@ -392,13 +388,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration
}
}

private async Task OnMessageReceivedAsync<TEvent, [DynamicallyAccessedMembers(TrimmingHelper.Consumer)] TConsumer>(EventRegistration reg,
EventConsumerRegistration ecr,
string queueUrl,
Message message,
CancellationToken cancellationToken)
where TEvent : class
where TConsumer : IEventConsumer
private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, string queueUrl, Message message, CancellationToken cancellationToken)
{
var messageId = message.MessageId;
message.TryGetAttribute(MetadataNames.CorrelationId, out var correlationId);
Expand All @@ -418,8 +408,8 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration

// Instrumentation
using var activity = EventBusActivitySource.StartActivity(ActivityNames.Consume, ActivityKind.Consumer, parentActivityId);
activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName);
activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName);
activity?.AddTag(ActivityTagNames.EventBusEventType, reg.EventType.FullName);
activity?.AddTag(ActivityTagNames.EventBusConsumerType, ecr.ConsumerType.FullName);
activity?.AddTag(ActivityTagNames.MessagingSystem, Name);
activity?.AddTag(ActivityTagNames.MessagingDestination, reg.EventName);
activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue");
Expand All @@ -430,22 +420,18 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration
var contentType = contentType_str == null ? null : new ContentType(contentType_str);

using var scope = CreateScope();
var context = await DeserializeAsync<TEvent>(scope: scope,
body: new BinaryData(message.Body),
contentType: contentType,
registration: reg,
identifier: messageId,
raw: message,
deadletter: ecr.Deadletter,
cancellationToken: cancellationToken).ConfigureAwait(false);
var context = await DeserializeAsync(scope: scope,
body: new BinaryData(message.Body),
contentType: contentType,
registration: reg,
identifier: messageId,
raw: message,
deadletter: ecr.Deadletter,
cancellationToken: cancellationToken).ConfigureAwait(false);

Logger.ReceivedMessage(messageId: messageId, eventBusId: context.Id, queueUrl: queueUrl);

var (successful, _) = await ConsumeAsync<TEvent, TConsumer>(registration: reg,
ecr: ecr,
@event: context,
scope: scope,
cancellationToken: cancellationToken).ConfigureAwait(false);
var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);

// dead-letter cannot be dead-lettered again, what else can we do?
if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails
Expand Down
Loading

0 comments on commit 1d1bfff

Please sign in to comment.