Skip to content

Commit

Permalink
feat: use Channel for EventQueueInMemory
Browse files Browse the repository at this point in the history
  • Loading branch information
WeihanLi committed Jan 4, 2025
1 parent 0efe26a commit 8de85fe
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions src/WeihanLi.Common/Event/EventQueueInMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;

#if NET
using System.Threading.Channels;
#endif

namespace WeihanLi.Common.Event;

public sealed class EventQueueInMemory : IEventQueue
{
#if NET
private readonly ConcurrentDictionary<string, Channel<IEvent>> _eventQueues = new();
#else
private readonly ConcurrentDictionary<string, ConcurrentQueue<IEvent>> _eventQueues = new();

#endif
public ICollection<string> GetQueues() => _eventQueues.Keys;

public Task<ICollection<string>> GetQueuesAsync() => Task.FromResult(GetQueues());

public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null)
public async Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventProperties? properties = null)
{
properties ??= new();
if (string.IsNullOrEmpty(properties.EventId))
Expand All @@ -36,16 +42,26 @@ public Task<bool> EnqueueAsync<TEvent>(string queueName, TEvent @event, EventPro
Data = @event,
Properties = properties
};
#if NET
var queue = _eventQueues.GetOrAdd(queueName, _ => Channel.CreateUnbounded<IEvent>());
await queue.Writer.WriteAsync(internalEvent);
#else
var queue = _eventQueues.GetOrAdd(queueName, _ => new ConcurrentQueue<IEvent>());
queue.Enqueue(internalEvent);
return Task.FromResult(true);
await Task.CompletedTask;
#endif
return true;
}

public Task<IEvent<TEvent>?> DequeueAsync<TEvent>(string queueName)
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
#if NET
if (queue.Reader.TryRead(out var eventWrapper))
#else
if (queue.TryDequeue(out var eventWrapper))
#endif
{
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}
Expand All @@ -61,12 +77,20 @@ public async IAsyncEnumerable<IEvent> ReadAllAsync(string queueName,
{
if (_eventQueues.TryGetValue(queueName, out var queue))
{
#if NET
await foreach (var @event in queue.Reader.ReadAllAsync(cancellationToken))
{
yield return @event;
}
#else
while (queue.TryDequeue(out var eventWrapper))
{
yield return eventWrapper;
}
#endif
}
await Task.Delay(100, cancellationToken);

await Task.Delay(200, cancellationToken);
}
}
}

0 comments on commit 8de85fe

Please sign in to comment.