Skip to content

Commit

Permalink
refactor: update AckQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
WeihanLi committed Jan 4, 2025
1 parent c689a00 commit 6a91e9f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/WeihanLi.Common/Event/AckQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public sealed class AckQueueOptions

public bool AutoRequeue { get; set; } = true;

public TimeSpan Requeue { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1);
}

public sealed class AckQueue : DisposableBase
Expand All @@ -27,7 +27,7 @@ public AckQueue(AckQueueOptions options)
_options = options;
if (options.AutoRequeue)
{
_timer = new Timer(_ => RequeueUnAckedMessages(), null, options.Requeue, options.Requeue);
_timer = new Timer(_ => RequeueUnAckedMessages(), null, options.RequeuePeriod, options.RequeuePeriod);
}
}

Expand Down
40 changes: 34 additions & 6 deletions test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,45 @@ public async Task AckMessageAsync_ShouldAcknowledgeAndRemoveMessage()
public async Task RequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout()
{
var testEvent = new TestEvent { Message = "Test Message" };
await _ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
var ackQueue = new AckQueue(new()
{
AutoRequeue = false,
AckTimeout = TimeSpan.FromSeconds(3)
});
await ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);

// Simulate timeout
await Task.Delay(TimeSpan.FromMinutes(2));
await Task.Delay(TimeSpan.FromSeconds(5));

ackQueue.RequeueUnAckedMessages();

_ackQueue.RequeueUnAckedMessages();
var requeuedEvent = await ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(requeuedEvent);
Assert.Equal(testEvent.Message, requeuedEvent.Data.Message);
}

[Fact]
public async Task AutoRequeueUnAckedMessagesAsync_ShouldRequeueUnAckedMessagesAfterTimeout()
{
var testEvent = new TestEvent { Message = "Test Message" };
var ackQueue = new AckQueue(new()
{
AutoRequeue = true,
AckTimeout = TimeSpan.FromSeconds(3),
RequeuePeriod = TimeSpan.FromSeconds(2)
});
await ackQueue.EnqueueAsync(testEvent);

var dequeuedEvent = await ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(dequeuedEvent);

// Simulate timeout
await Task.Delay(TimeSpan.FromSeconds(5));

var requeuedEvent = await _ackQueue.DequeueAsync<TestEvent>();
var requeuedEvent = await ackQueue.DequeueAsync<TestEvent>();
Assert.NotNull(requeuedEvent);
Assert.Equal(testEvent.Message, requeuedEvent.Data.Message);
}
Expand Down

0 comments on commit 6a91e9f

Please sign in to comment.