diff --git a/src/WeihanLi.Common/Event/EventQueueInMemory.cs b/src/WeihanLi.Common/Event/EventQueueInMemory.cs index eebd01c2..c5f22e9a 100644 --- a/src/WeihanLi.Common/Event/EventQueueInMemory.cs +++ b/src/WeihanLi.Common/Event/EventQueueInMemory.cs @@ -3,20 +3,26 @@ using System.Collections.Concurrent; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +#if NET +using System.Threading.Channels; +#endif + namespace WeihanLi.Common.Event; public sealed class EventQueueInMemory : IEventQueue { +#if NET + private readonly ConcurrentDictionary> _eventQueues = new(); +#else private readonly ConcurrentDictionary> _eventQueues = new(); - +#endif public ICollection GetQueues() => _eventQueues.Keys; public Task> GetQueuesAsync() => Task.FromResult(GetQueues()); - public Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null) + public async Task EnqueueAsync(string queueName, TEvent @event, EventProperties? properties = null) { properties ??= new(); if (string.IsNullOrEmpty(properties.EventId)) @@ -36,16 +42,26 @@ public Task EnqueueAsync(string queueName, TEvent @event, EventPro Data = @event, Properties = properties }; +#if NET + var queue = _eventQueues.GetOrAdd(queueName, _ => Channel.CreateUnbounded()); + await queue.Writer.WriteAsync(internalEvent); +#else var queue = _eventQueues.GetOrAdd(queueName, _ => new ConcurrentQueue()); queue.Enqueue(internalEvent); - return Task.FromResult(true); + await Task.CompletedTask; +#endif + return true; } public Task?> DequeueAsync(string queueName) { if (_eventQueues.TryGetValue(queueName, out var queue)) { +#if NET + if (queue.Reader.TryRead(out var eventWrapper)) +#else if (queue.TryDequeue(out var eventWrapper)) +#endif { return Task.FromResult((IEvent?)eventWrapper); } @@ -61,12 +77,20 @@ public async IAsyncEnumerable ReadAllAsync(string queueName, { if (_eventQueues.TryGetValue(queueName, out var queue)) { +#if NET + await foreach (var @event in queue.Reader.ReadAllAsync(cancellationToken)) + { + yield return @event; + } +#else while (queue.TryDequeue(out var eventWrapper)) { yield return eventWrapper; } +#endif } - await Task.Delay(100, cancellationToken); + + await Task.Delay(200, cancellationToken); } } }