Skip to content

Commit

Permalink
Merge pull request #241 from WeihanLi/dev
Browse files Browse the repository at this point in the history
1.0.74 preview 2
  • Loading branch information
WeihanLi authored Jan 4, 2025
2 parents 5a5f507 + 83425b8 commit 9483875
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 0 deletions.
109 changes: 109 additions & 0 deletions src/WeihanLi.Common/Event/AckQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using WeihanLi.Common.Helpers;

namespace WeihanLi.Common.Event;

public sealed class AckQueueOptions
{
public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(1);

public bool AutoRequeue { get; set; }

public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1);
}

public sealed class AckQueue : DisposableBase
{
private readonly AckQueueOptions _options;
private readonly ConcurrentQueue<IEvent> _queue = new();
private readonly ConcurrentDictionary<string, IEvent> _unAckedMessages = new();
private readonly Timer? _timer;

public AckQueue() : this(new()) { }

public AckQueue(AckQueueOptions options)
{
_options = options;
if (options.AutoRequeue)
{
_timer = new Timer(_ => RequeueUnAckedMessages(), null, options.RequeuePeriod, options.RequeuePeriod);
}
}

public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = null)
{
properties ??= new EventProperties();
if (string.IsNullOrEmpty(properties.EventId))
{
properties.EventId = Guid.NewGuid().ToString();
}

if (properties.EventAt == default)
{
properties.EventAt = DateTimeOffset.Now;
}

var internalEvent = new EventWrapper<TEvent>
{
Data = @event,
Properties = properties
};

_queue.Enqueue(internalEvent);
return Task.CompletedTask;
}

public Task<IEvent<TEvent>?> DequeueAsync<TEvent>()
{
if (_queue.TryDequeue(out var eventWrapper))
{
_unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}

return Task.FromResult<IEvent<TEvent>?>(null);
}

public Task AckMessageAsync(string eventId)
{
_unAckedMessages.TryRemove(eventId, out _);
return Task.CompletedTask;
}

public void RequeueUnAckedMessages()
{
foreach (var message in _unAckedMessages)
{
if (DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout)
{
if (_unAckedMessages.TryRemove(message.Key, out var eventWrapper)
&& eventWrapper != null)
{
_queue.Enqueue(eventWrapper);
}
}
}
}

public async IAsyncEnumerable<IEvent> ReadAllAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
while (_queue.TryDequeue(out var eventWrapper))
{
_unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
yield return eventWrapper;
}

await Task.Delay(200, cancellationToken);
}
}

protected override void Dispose(bool disposing)
{
_timer?.Dispose();
base.Dispose(disposing);
}
}
105 changes: 105 additions & 0 deletions test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using WeihanLi.Common.Event;
using Xunit;

namespace WeihanLi.Common.Test.EventsTest
{
public class AckQueueTest
{
private readonly AckQueue _ackQueue = new(new()
{
AutoRequeue = false
});

[Fact]
public async Task EnqueueAsync_ShouldAddMessageToQueue()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);
Assert.Equal(testEvent.Message, dequeuedEvent.Data.Message);
}

[Fact]
public async Task DequeueAsync_ShouldRetrieveMessageWithoutRemoval()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent1 = await _ackQueue.DequeueAsync<TestEvent>();
var dequeuedEvent2 = await _ackQueue.DequeueAsync<TestEvent>();

Assert.NotNull(dequeuedEvent1);
Assert.Equal(testEvent.Message, dequeuedEvent1.Data.Message);
Assert.Null(dequeuedEvent2);
}

[Fact]
public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);

await _ackQueue.AckMessageAsync(dequeuedEvent.Properties.EventId);

var dequeuedEventAfterAck = await _ackQueue.DequeueAsync<TestEvent>();
Assert.Null(dequeuedEventAfterAck);
}

[Fact]
public async Task RequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout()
{
var testEvent = new TestEvent { Message = "Test Message" };
var ackQueue = new AckQueue(new()
{
AutoRequeue = false,
AckTimeout = TimeSpan.FromSeconds(3)
});
await ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);

// Simulate timeout
await Task.Delay(TimeSpan.FromSeconds(5));

ackQueue.RequeueUnAckedMessages();

var requeuedEvent = await ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(requeuedEvent);
Assert.Equal(testEvent.Message, requeuedEvent.Data.Message);
}

[Fact]
public async Task AutoRequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout()
{
var testEvent = new TestEvent { Message = "Test Message" };
await using var ackQueue = new AckQueue(new()
{
AutoRequeue = true,
AckTimeout = TimeSpan.FromSeconds(3),
RequeuePeriod = TimeSpan.FromSeconds(2)
});
await ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);

// Simulate timeout
await Task.Delay(TimeSpan.FromSeconds(5));

var requeuedEvent = await ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(requeuedEvent);
Assert.Equal(testEvent.Message, requeuedEvent.Data.Message);
}

private class TestEvent
{
public string Message { get; set; }
}
}
}

0 comments on commit 9483875

Please sign in to comment.