Skip to content

Commit

Permalink
Merge pull request #238 from WeihanLi/dev
Browse files Browse the repository at this point in the history
1.0.74 preview 1
  • Loading branch information
WeihanLi authored Jan 4, 2025
2 parents 886f09c + f37b554 commit 607da09
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 55 deletions.
16 changes: 8 additions & 8 deletions samples/AspNetCoreSample/Events/EventConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using WeihanLi.Common.Event;
using WeihanLi.Common;
using WeihanLi.Common.Event;
using WeihanLi.Extensions;

namespace AspNetCoreSample.Events;
Expand All @@ -7,25 +8,24 @@ public class EventConsumer
(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory)
: BackgroundService
{
private readonly IEventQueue _eventQueue = eventQueue;
private readonly IEventHandlerFactory _eventHandlerFactory = eventHandlerFactory;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var queues = await _eventQueue.GetQueuesAsync();
var queues = await eventQueue.GetQueuesAsync();
if (queues.Count > 0)
{
await queues.Select(async q =>
{
if (await _eventQueue.TryDequeueAsync(q, out var @event, out var properties))
await foreach (var e in eventQueue.ReadAll(q, stoppingToken))
{
var handlers = _eventHandlerFactory.GetHandlers(@event.GetType());
var @event = e.Data;
Guard.NotNull(@event);
var handlers = eventHandlerFactory.GetHandlers(@event.GetType());
if (handlers.Count > 0)
{
await handlers
.Select(h => h.Handle(@event, properties))
.Select(h => h.Handle(@event, e.Properties))
.WhenAll()
;
}
Expand Down
52 changes: 44 additions & 8 deletions src/WeihanLi.Common/Event/EventBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,62 @@ public interface IEvent<out T>
public class EventWrapper<T> : IEvent, IEvent<T>
{
public required T Data { get; init; }
object? IEvent.Data => (object?)Data;
object? IEvent.Data => Data;
public required EventProperties Properties { get; init; }
}

public static class EventBaseExtensions
public static class EventExtensions
{
private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension.SerializerSettingsWith(s =>
{
s.TypeNameHandling = TypeNameHandling.Objects;
});
private static readonly JsonSerializerSettings EventSerializerSettings = JsonSerializeExtension
.SerializerSettingsWith(s =>
{
s.NullValueHandling = NullValueHandling.Ignore;
s.TypeNameHandling = TypeNameHandling.Objects;
});

public static string ToEventMsg<TEvent>(this TEvent @event)
{
Guard.NotNull(@event);
return GetEvent(@event).ToJson(EventSerializerSettings);
}

public static string ToEventRawMsg<TEvent>(this TEvent @event)
{
Guard.NotNull(@event);
return @event.ToJson(EventSerializerSettings);
}

public static IEventBase ToEvent(this string eventMsg)
private static IEvent GetEvent<TEvent>(this TEvent @event)
{
if (@event is IEvent eventEvent)
return eventEvent;

if (@event is IEventBase eventBase)
return new EventWrapper<TEvent>()
{
Data = @event,
Properties = new()
{
EventAt = eventBase.EventAt,
EventId = eventBase.EventId
}
};

return new EventWrapper<TEvent>
{
Data = @event,
Properties = new EventProperties
{
EventAt = DateTimeOffset.Now
}
};
}

public static TEvent ToEvent<TEvent>(this string eventMsg)
{
Guard.NotNull(eventMsg);
return eventMsg.JsonToObject<IEventBase>(EventSerializerSettings);
return eventMsg.JsonToObject<TEvent>(EventSerializerSettings);
}

public static IEvent ToEvent(this string eventMsg) => ToEvent<IEvent>(eventMsg);
}
4 changes: 1 addition & 3 deletions src/WeihanLi.Common/Event/EventHandlerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ namespace WeihanLi.Common.Event;

public sealed class DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager) : IEventHandlerFactory
{
private readonly IEventSubscriptionManager _subscriptionManager = subscriptionManager;

[RequiresUnreferencedCode("Unreferenced code may be used")]
public ICollection<IEventHandler> GetHandlers(Type eventType)
{
var eventHandlers = _subscriptionManager.GetEventHandlers(eventType);
var eventHandlers = subscriptionManager.GetEventHandlers(eventType);
return eventHandlers;
}
}
35 changes: 21 additions & 14 deletions src/WeihanLi.Common/Event/EventQueueInMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,48 @@ public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventPro
return Task.FromResult(true);
}

public Task<bool> TryDequeueAsync(string queueName, [NotNullWhen(true)] out object? @event, [NotNullWhen(true)] out EventProperties? properties)
public Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName)
{
@event = default;
properties = default;

if (_eventQueues.TryGetValue(queueName, out var queue))
{
if (queue.TryDequeue(out var eventWrapper))
{
@event = eventWrapper.Data;
properties = eventWrapper.Properties;
return Task.FromResult(true);
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}
}

return Task.FromResult(false);
return Task.FromResult<IEvent<TEvent>?>(null);
}

internal async IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllAsync<TEvent>(string queueName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public async IAsyncEnumerable<IEvent> ReadAll(string queueName,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper))
{
yield return ((TEvent)eventWrapper!.Data!, eventWrapper!.Properties);
yield return eventWrapper;
}
}
await Task.Delay(100);
await Task.Delay(100, cancellationToken);
}
}

public bool TryRemoveQueue(string queueName)

public async IAsyncEnumerable<IEvent<TEvent>> ReadEvents<TEvent>(string queueName,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
return _eventQueues.TryRemove(queueName, out _);
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent<TEvent> @event)
{
yield return @event;
}
}
await Task.Delay(100, cancellationToken);
}
}
}
12 changes: 4 additions & 8 deletions src/WeihanLi.Common/Event/IEventQueue.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
// Copyright (c) Weihan Li. All rights reserved.
// Licensed under the Apache license.

using System.Diagnostics.CodeAnalysis;

namespace WeihanLi.Common.Event;

public interface IEventQueue
{
Task<ICollection<string>> GetQueuesAsync();

Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null);

Task<bool> TryDequeueAsync(string queueName, [MaybeNullWhen(false)] out object @event, [MaybeNullWhen(false)] out EventProperties properties);

// IAsyncEnumerable<(TEvent Event, EventProperties Properties)> ReadAllEvents<TEvent>(string queueName, CancellationToken cancellationToken = default);
Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName);
IAsyncEnumerable<IEvent> ReadAll(string queueName, CancellationToken cancellationToken = default);
IAsyncEnumerable<IEvent<TEvent>> ReadEvents<TEvent>(string queueName, CancellationToken cancellationToken = default);
}

public static class EventQueueExtensions
Expand All @@ -23,6 +19,6 @@ public static class EventQueueExtensions
public static Task<bool> EnqueueAsync<TEvent>(this IEventQueue eventQueue, TEvent @event, EventProperties? properties = null)
where TEvent : class
{
return eventQueue.EnqueueAsync(DefaultQueueName, @event);
return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,6 @@ public async Task ClassTest()
await handTask;
}

[Fact]
public async Task GenericMethodTest()
{
var publisher = _serviceProvider.ResolveRequiredService<IEventPublisher>();
Assert.NotNull(publisher);
var publisherType = publisher.GetType();
Assert.True(publisherType.IsSealed);
Assert.True(publisherType.Assembly.IsDynamic);

await publisher.PublishAsync(new TestEvent());
}

// not supported, will not intercept
[Fact]
public void OpenGenericTypeTest()
Expand Down
24 changes: 22 additions & 2 deletions test/WeihanLi.Common.Test/EventsTest/EventBaseTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public void EventMessageExtensionsTest()
{
Name = "1213"
};
var eventMsg = testEvent.ToEventMsg();
var eventFromMsg = eventMsg.ToEvent();
var eventMsg = testEvent.ToEventRawMsg();
var eventFromMsg = eventMsg.ToEvent<TestEvent>();
Assert.Equal(typeof(TestEvent), eventFromMsg.GetType());

var deserializedEvent = eventFromMsg as TestEvent;
Expand All @@ -53,4 +53,24 @@ public void EventMessageExtensionsTest()
Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt);
Assert.Equal(testEvent.Name, deserializedEvent.Name);
}

[Fact]
public void EventMessageExtensions2Test()
{
var testEvent = new TestEvent()
{
Name = "1213"
};
var eventMsg = testEvent.ToEventMsg();
var eventFromMsg = eventMsg.ToEvent<EventWrapper<TestEvent>>();
Assert.Equal(typeof(EventWrapper<TestEvent>), eventFromMsg.GetType());

var deserializedEvent = eventFromMsg.Data;
Assert.NotNull(deserializedEvent);
Assert.Equal(testEvent.EventId, deserializedEvent.EventId);
Assert.Equal(testEvent.EventAt, deserializedEvent.EventAt);
Assert.Equal(testEvent.Name, deserializedEvent.Name);
Assert.Equal(testEvent.EventId, eventFromMsg.Properties.EventId);
Assert.Equal(testEvent.EventAt, eventFromMsg.Properties.EventAt);
}
}

0 comments on commit 607da09

Please sign in to comment.