Skip to content

Commit

Permalink
Merge pull request #239 from WeihanLi/dev
Browse files Browse the repository at this point in the history
1.0.74 preview 1.1
  • Loading branch information
WeihanLi authored Jan 4, 2025
2 parents 607da09 + 8de85fe commit 5a5f507
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 26 deletions.
2 changes: 1 addition & 1 deletion samples/AspNetCoreSample/Events/EventConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await queues.Select(async q =>
{
await foreach (var e in eventQueue.ReadAll(q, stoppingToken))
await foreach (var e in eventQueue.ReadAllAsync(q, stoppingToken))
{
var @event = e.Data;
Guard.NotNull(@event);
Expand Down
53 changes: 30 additions & 23 deletions src/WeihanLi.Common/Event/EventQueueInMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,26 @@

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;

#if NET
using System.Threading.Channels;
#endif

namespace WeihanLi.Common.Event;

public sealed class EventQueueInMemory : IEventQueue
{
#if NET
private readonly ConcurrentDictionary<string, Channel<IEvent>> _eventQueues = new();
#else
private readonly ConcurrentDictionary<string, ConcurrentQueue<IEvent>> _eventQueues = new();

#endif
public ICollection<string> GetQueues() => _eventQueues.Keys;

public Task<ICollection<string>> GetQueuesAsync() => Task.FromResult(GetQueues());


public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null)
public async Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null)
{
properties ??= new();
if (string.IsNullOrEmpty(properties.EventId))
Expand All @@ -37,16 +42,26 @@ public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventPro
Data = @event,
Properties = properties
};
#if NET
var queue = _eventQueues.GetOrAdd(queueName, _ => Channel.CreateUnbounded<IEvent>());
await queue.Writer.WriteAsync(internalEvent);
#else
var queue = _eventQueues.GetOrAdd(queueName, _ => new ConcurrentQueue<IEvent>());
queue.Enqueue(internalEvent);
return Task.FromResult(true);
await Task.CompletedTask;
#endif
return true;
}

public Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
#if NET
if (queue.Reader.TryRead(out var eventWrapper))
#else
if (queue.TryDequeue(out var eventWrapper))
#endif
{
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}
Expand All @@ -55,35 +70,27 @@ public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventPro
return Task.FromResult<IEvent<TEvent>?>(null);
}

public async IAsyncEnumerable<IEvent> ReadAll(string queueName,
public async IAsyncEnumerable<IEvent> ReadAllAsync(string queueName,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper))
#if NET
await foreach (var @event in queue.Reader.ReadAllAsync(cancellationToken))
{
yield return eventWrapper;
yield return @event;
}
}
await Task.Delay(100, cancellationToken);
}
}

public async IAsyncEnumerable<IEvent<TEvent>> ReadEvents<TEvent>(string queueName,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
while (queue.TryDequeue(out var eventWrapper) && eventWrapper is IEvent<TEvent> @event)
#else
while (queue.TryDequeue(out var eventWrapper))
{
yield return @event;
yield return eventWrapper;
}
#endif
}
await Task.Delay(100, cancellationToken);

await Task.Delay(200, cancellationToken);
}
}
}
18 changes: 16 additions & 2 deletions src/WeihanLi.Common/Event/IEventQueue.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
// Copyright (c) Weihan Li. All rights reserved.
// Licensed under the Apache license.

using System.Runtime.CompilerServices;

namespace WeihanLi.Common.Event;

public interface IEventQueue
{
Task<ICollection<string>> GetQueuesAsync();
Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null);
Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName);
IAsyncEnumerable<IEvent> ReadAll(string queueName, CancellationToken cancellationToken = default);
IAsyncEnumerable<IEvent<TEvent>> ReadEvents<TEvent>(string queueName, CancellationToken cancellationToken = default);
IAsyncEnumerable<IEvent> ReadAllAsync(string queueName, CancellationToken cancellationToken = default);
}

public static class EventQueueExtensions
Expand All @@ -21,4 +22,17 @@ public static Task<bool> EnqueueAsync<TEvent>(this IEventQueue eventQueue, TEven
{
return eventQueue.EnqueueAsync(DefaultQueueName, @event, properties);
}

public static async IAsyncEnumerable<IEvent<TEvent>> ReadEventsAsync<TEvent>(
this IEventQueue eventQueue,
string queueName,
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
await foreach (var @event in eventQueue.ReadAllAsync(queueName, cancellationToken))
{
if(@event is IEvent<TEvent> eventEvent)
yield return eventEvent;
}
}
}

0 comments on commit 5a5f507

Please sign in to comment.