Skip to content

Commit

Permalink
Update our len() function to account for pending
Browse files Browse the repository at this point in the history
To ensure that our len function returns data that is more accurate to
calculate the number of instances we need, subtract the XPENDING count
from the XLEN count. This reduces the cases where we a new version or
scaling config changes will result in a complete duplication of the pods
simply because pending elements in the queue are counted as being
in-queue.

Use of XPENDING will allow us to subtract anything that has been claimed
from the metric calculation.
  • Loading branch information
tempusfrangit committed Dec 17, 2024
1 parent 5828335 commit 5ae7c72
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
25 changes: 23 additions & 2 deletions queue/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestClientIntegration(t *testing.T) {

ids := make(map[string]struct{})

// Read 10 of the 15 messages, so we can test that the rest are still there
for i := range 15 {
msg, err := client.Read(ctx, &queue.ReadArgs{
Name: "test",
Expand Down Expand Up @@ -96,10 +97,30 @@ func TestClientIntegration(t *testing.T) {

require.ErrorIs(t, err, queue.Empty)

// Len remains 15 (because we haven't XDELed the messages or XTRIMed the stream)
// Len is zero (because messages are claimed/pending)
length, err = client.Len(ctx, "test")
require.NoError(t, err)
assert.EqualValues(t, 15, length)
assert.EqualValues(t, 0, length)

// Add more elements to the queue
for range 5 {
_, err := client.Write(ctx, &queue.WriteArgs{
Name: "test",
Streams: 16,
StreamsPerShard: 2,
ShardKey: []byte("tuna"),
Values: map[string]any{
"type": "fish",
"id": id,
},
})
require.NoError(t, err)
id++
}
// Len is 5 as we have added more to the queue
length, err = client.Len(ctx, "test")
require.NoError(t, err)
assert.EqualValues(t, 5, length)
}

// Check that the Block option works as expected
Expand Down
20 changes: 19 additions & 1 deletion queue/len.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
-- queue, which may be composed of multiple streams and metadata keys, is always
-- on the same server.

-- Note: This script performs an additional step in logic to calculate how many elements
-- are in the queue that are unclaimed. This is done by calculating the number of elements
-- and subtracting the result from XPENDING. XPENDING requires a group name to be passed, so
-- the script will iterate through all groups in the stream.

local base = KEYS[1]

local key_meta = base .. ':meta'
Expand All @@ -16,7 +21,20 @@ local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1)
local result = 0

for idx = 0, streams-1 do
result = result + redis.call('XLEN', base .. ':s' .. idx)
local success, groupInfo = pcall(function()
return redis.call('XINFO', 'GROUPS', base .. ':s' .. idx)
end)
local pendingCount = 0
if success and groupInfo ~= nil and #groupInfo ~= 0 then
for _, group in ipairs(groupInfo) do
local groupName = group[2]
local pendingInfo = redis.call('XPENDING', base .. ':s' .. idx, groupName)
pendingCount = pendingCount + pendingInfo[1]
end
end

result = result + redis.call('XLEN', base .. ':s' .. idx) - pendingCount
result = (result <= 0) and nil or result
end

return result

0 comments on commit 5ae7c72

Please sign in to comment.