From 07f9da5c10714671f8d010d8ec2c4a2e8f7a7fe2 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 00:46:54 +0800 Subject: [PATCH 1/4] refactor: update EventExtensions --- src/WeihanLi.Common/Event/EventBase.cs | 50 +++++++++++++++++++++----- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index 1ae94979..eb96d25b 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -67,26 +67,60 @@ public interface IEvent public class EventWrapper : IEvent, IEvent { public required T Data { get; init; } - object? IEvent.Data => (object?)Data; + object? IEvent.Data => Data; public required EventProperties Properties { get; init; } } -public static class EventBaseExtensions +public static class EventExtensions { - private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension.SerializerSettingsWith(s => - { - s.TypeNameHandling = TypeNameHandling.Objects; - }); + private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension + .SerializerSettingsWith(s => + { + s.NullValueHandling = NullValueHandling.Ignore; + s.TypeNameHandling = TypeNameHandling.Objects; + }); public static string ToEventMsg(this TEvent @event) { Guard.NotNull(@event); - return @event.ToJson(EventSerializerSettings); + return GetEvent(@event).ToJson(EventSerializerSettings); + } + + private static IEvent GetEvent(this TEvent @event) + { + if (@event is IEvent eventEvent) + return eventEvent; + + if (@event is IEventBase eventBase) + return new EventWrapper() + { + Data = @event, + Properties = new() + { + EventAt = eventBase.EventAt, + EventId = eventBase.EventId + } + }; + + return new EventWrapper + { + Data = @event, + Properties = new EventProperties + { + EventAt = DateTimeOffset.Now + } + }; } - public static IEventBase ToEvent(this string eventMsg) + public static IEventBase ToEventBase(this string eventMsg) { Guard.NotNull(eventMsg); return eventMsg.JsonToObject(EventSerializerSettings); } + + public static IEvent ToEvent(this string eventMsg) + { + Guard.NotNull(eventMsg); + return eventMsg.JsonToObject(EventSerializerSettings); + } } From 310abe4c03723d8220aef7740fbb98e19d2a8adb Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 00:49:38 +0800 Subject: [PATCH 2/4] refactor: update ToEvent extension --- src/WeihanLi.Common/Event/EventBase.cs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index eb96d25b..4b01df48 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -112,15 +112,11 @@ private static IEvent GetEvent(this TEvent @event) }; } - public static IEventBase ToEventBase(this string eventMsg) + public static TEvent ToEvent(this string eventMsg) { Guard.NotNull(eventMsg); - return eventMsg.JsonToObject(EventSerializerSettings); + return eventMsg.JsonToObject(EventSerializerSettings); } - public static IEvent ToEvent(this string eventMsg) - { - Guard.NotNull(eventMsg); - return eventMsg.JsonToObject(EventSerializerSettings); - } + public static IEvent ToEvent(this string eventMsg) => ToEvent(eventMsg); } From 815e08307093fb922512be96859f09b38636a21a Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 01:05:22 +0800 Subject: [PATCH 3/4] refactor: update EventQueue --- .../AspNetCoreSample/Events/EventConsumer.cs | 16 ++++----- .../Event/EventQueueInMemory.cs | 35 +++++++++++-------- src/WeihanLi.Common/Event/IEventQueue.cs | 12 +++---- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/samples/AspNetCoreSample/Events/EventConsumer.cs b/samples/AspNetCoreSample/Events/EventConsumer.cs index d244e9d7..99bfb45a 100644 --- a/samples/AspNetCoreSample/Events/EventConsumer.cs +++ b/samples/AspNetCoreSample/Events/EventConsumer.cs @@ -1,4 +1,5 @@ -using WeihanLi.Common.Event; +using WeihanLi.Common; +using WeihanLi.Common.Event; using WeihanLi.Extensions; namespace AspNetCoreSample.Events; @@ -7,25 +8,24 @@ public class EventConsumer (IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory) : BackgroundService { - private readonly IEventQueue _eventQueue = eventQueue; - private readonly IEventHandlerFactory _eventHandlerFactory = eventHandlerFactory; - protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { - var queues = await _eventQueue.GetQueuesAsync(); + var queues = await eventQueue.GetQueuesAsync(); if (queues.Count > 0) { await queues.Select(async q => { - if (await _eventQueue.TryDequeueAsync(q, out var @event, out var properties)) + await foreach (var e in eventQueue.ReadAllEvents(q, stoppingToken)) { - var handlers = _eventHandlerFactory.GetHandlers(@event.GetType()); + var @event = e.Data; + Guard.NotNull(@event); + var handlers = eventHandlerFactory.GetHandlers(@event.GetType()); if (handlers.Count > 0) { await handlers - .Select(h => h.Handle(@event, properties)) + .Select(h => h.Handle(@event, e.Properties)) .WhenAll() ; } diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index cca4019e..8f24ca6c 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -42,25 +42,21 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro return Task.FromResult(true); } - public Task TryDequeueAsync(string queueName, [NotNullWhen(true)] out object? @event, [NotNullWhen(true)] out EventProperties? properties) + public Task?> DequeueAsync(string queueName) { - @event = default; - properties = default; - if (_eventQueues.TryGetValue(queueName, out var queue)) { if (queue.TryDequeue(out var eventWrapper)) { - @event = eventWrapper.Data; - properties = eventWrapper.Properties; - return Task.FromResult(true); + return Task.FromResult((IEvent?)eventWrapper); } } - return Task.FromResult(false); + return Task.FromResult?>(null); } - internal async IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllAsync(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public async IAsyncEnumerable ReadAllEvents(string queueName, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) { @@ -68,15 +64,26 @@ public Task TryDequeueAsync(string queueName, [NotNullWhen(true)] out obje { while (queue.TryDequeue(out var eventWrapper)) { - yield return ((TEvent)eventWrapper!.Data!, eventWrapper!.Properties); + yield return eventWrapper; } } - await Task.Delay(100); + await Task.Delay(100, cancellationToken); } } - - public bool TryRemoveQueue(string queueName) + + public async IAsyncEnumerable> ReadAllEvents(string queueName, + [EnumeratorCancellation]CancellationToken cancellationToken = default) { - return _eventQueues.TryRemove(queueName, out _); + while (!cancellationToken.IsCancellationRequested) + { + if (_eventQueues.TryGetValue(queueName, out var queue)) + { + while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent @event) + { + yield return @event; + } + } + await Task.Delay(100, cancellationToken); + } } } diff --git a/src/WeihanLi.Common/Event/IEventQueue.cs b/src/WeihanLi.Common/Event/IEventQueue.cs index 7d09ef3b..6836b70b 100644 --- a/src/WeihanLi.Common/Event/IEventQueue.cs +++ b/src/WeihanLi.Common/Event/IEventQueue.cs @@ -1,19 +1,15 @@ // Copyright (c) Weihan Li. All rights reserved. // Licensed under the Apache license. -using System.Diagnostics.CodeAnalysis; - namespace WeihanLi.Common.Event; public interface IEventQueue { Task> GetQueuesAsync(); - Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null); - - Task TryDequeueAsync(string queueName, [MaybeNullWhen(false)] out object @event, [MaybeNullWhen(false)] out EventProperties properties); - - // IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllEvents(string queueName, CancellationToken cancellationToken = default); + Task?> DequeueAsync(string queueName); + IAsyncEnumerable> ReadAllEvents(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAllEvents(string queueName, CancellationToken cancellationToken = default); } public static class EventQueueExtensions @@ -23,6 +19,6 @@ public static class EventQueueExtensions public static Task EnqueueAsync(this IEventQueue eventQueue, TEvent @event, EventProperties? properties = null) where TEvent : class { - return eventQueue.EnqueueAsync(DefaultQueueName, @event); + return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties); } } From f37b5548a6e2f80528ad11375aa24da9362ad896 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Sat, 4 Jan 2025 01:20:14 +0800 Subject: [PATCH 4/4] refactor: refine events --- .../AspNetCoreSample/Events/EventConsumer.cs | 2 +- src/WeihanLi.Common/Event/EventBase.cs | 6 +++++ .../Event/EventHandlerFactory.cs | 4 +--- .../Event/EventQueueInMemory.cs | 4 ++-- src/WeihanLi.Common/Event/IEventQueue.cs | 4 ++-- .../ServiceContainerBuilderBuildTest.cs | 12 ---------- .../EventsTest/EventBaseTest.cs | 24 +++++++++++++++++-- 7 files changed, 34 insertions(+), 22 deletions(-) diff --git a/samples/AspNetCoreSample/Events/EventConsumer.cs b/samples/AspNetCoreSample/Events/EventConsumer.cs index 99bfb45a..dbe21da1 100644 --- a/samples/AspNetCoreSample/Events/EventConsumer.cs +++ b/samples/AspNetCoreSample/Events/EventConsumer.cs @@ -17,7 +17,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await queues.Select(async q => { - await foreach (var e in eventQueue.ReadAllEvents(q, stoppingToken)) + await foreach (var e in eventQueue.ReadAll(q, stoppingToken)) { var @event = e.Data; Guard.NotNull(@event); diff --git a/src/WeihanLi.Common/Event/EventBase.cs b/src/WeihanLi.Common/Event/EventBase.cs index 4b01df48..09ba4a74 100644 --- a/src/WeihanLi.Common/Event/EventBase.cs +++ b/src/WeihanLi.Common/Event/EventBase.cs @@ -85,6 +85,12 @@ public static string ToEventMsg(this TEvent @event) Guard.NotNull(@event); return GetEvent(@event).ToJson(EventSerializerSettings); } + + public static string ToEventRawMsg(this TEvent @event) + { + Guard.NotNull(@event); + return @event.ToJson(EventSerializerSettings); + } private static IEvent GetEvent(this TEvent @event) { diff --git a/src/WeihanLi.Common/Event/EventHandlerFactory.cs b/src/WeihanLi.Common/Event/EventHandlerFactory.cs index 6c4e4cfd..229b8eb7 100644 --- a/src/WeihanLi.Common/Event/EventHandlerFactory.cs +++ b/src/WeihanLi.Common/Event/EventHandlerFactory.cs @@ -7,12 +7,10 @@ namespace WeihanLi.Common.Event; public sealed class DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager) : IEventHandlerFactory { - private readonly IEventSubscriptionManager _subscriptionManager = subscriptionManager; - [RequiresUnreferencedCode("Unreferenced code may be used")] public ICollection GetHandlers(Type eventType) { - var eventHandlers = _subscriptionManager.GetEventHandlers(eventType); + var eventHandlers = subscriptionManager.GetEventHandlers(eventType); return eventHandlers; } } diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index 8f24ca6c..2d8738b4 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -55,7 +55,7 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro return Task.FromResult?>(null); } - public async IAsyncEnumerable ReadAllEvents(string queueName, + public async IAsyncEnumerable ReadAll(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) @@ -71,7 +71,7 @@ public async IAsyncEnumerable ReadAllEvents(string queueName, } } - public async IAsyncEnumerable> ReadAllEvents(string queueName, + public async IAsyncEnumerable> ReadEvents(string queueName, [EnumeratorCancellation]CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) diff --git a/src/WeihanLi.Common/Event/IEventQueue.cs b/src/WeihanLi.Common/Event/IEventQueue.cs index 6836b70b..5d1ddf53 100644 --- a/src/WeihanLi.Common/Event/IEventQueue.cs +++ b/src/WeihanLi.Common/Event/IEventQueue.cs @@ -8,8 +8,8 @@ public interface IEventQueue Task> GetQueuesAsync(); Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null); Task?> DequeueAsync(string queueName); - IAsyncEnumerable> ReadAllEvents(string queueName, CancellationToken cancellationToken = default); - IAsyncEnumerable ReadAllEvents(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAll(string queueName, CancellationToken cancellationToken = default); + IAsyncEnumerable> ReadEvents(string queueName, CancellationToken cancellationToken = default); } public static class EventQueueExtensions diff --git a/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs b/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs index 8cfe4028..01d5d9b4 100644 --- a/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs +++ b/test/WeihanLi.Common.Test/AspectTest/ServiceContainerBuilderBuildTest.cs @@ -99,18 +99,6 @@ public async Task ClassTest() await handTask; } - [Fact] - public async Task GenericMethodTest() - { - var publisher = _serviceProvider.ResolveRequiredService(); - Assert.NotNull(publisher); - var publisherType = publisher.GetType(); - Assert.True(publisherType.IsSealed); - Assert.True(publisherType.Assembly.IsDynamic); - - await publisher.PublishAsync(new TestEvent()); - } - // not supported, will not intercept [Fact] public void OpenGenericTypeTest() diff --git a/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs b/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs index cd815875..9a686a3d 100644 --- a/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs +++ b/test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs @@ -43,8 +43,8 @@ public void EventMessageExtensionsTest() { Name = "1213" }; - var eventMsg = testEvent.ToEventMsg(); - var eventFromMsg = eventMsg.ToEvent(); + var eventMsg = testEvent.ToEventRawMsg(); + var eventFromMsg = eventMsg.ToEvent(); Assert.Equal(typeof(TestEvent), eventFromMsg.GetType()); var deserializedEvent = eventFromMsg as TestEvent; @@ -53,4 +53,24 @@ public void EventMessageExtensionsTest() Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt); Assert.Equal(testEvent.Name, deserializedEvent.Name); } + + [Fact] + public void EventMessageExtensions2Test() + { + var testEvent = new TestEvent() + { + Name = "1213" + }; + var eventMsg = testEvent.ToEventMsg(); + var eventFromMsg = eventMsg.ToEvent>(); + Assert.Equal(typeof(EventWrapper), eventFromMsg.GetType()); + + var deserializedEvent = eventFromMsg.Data; + Assert.NotNull(deserializedEvent); + Assert.Equal(testEvent.EventId, deserializedEvent.EventId); + Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt); + Assert.Equal(testEvent.Name, deserializedEvent.Name); + Assert.Equal(testEvent.EventId, eventFromMsg.Properties.EventId); + Assert.Equal(testEvent.EventAt, eventFromMsg.Properties.EventAt); + } }