Skip to content

Commit

Permalink
[queue] add Lag() command to measure consumer group lag (#159)
Browse files Browse the repository at this point in the history
This adds a Lag() command to measure the lag of a consumer group over a queue.
Lag is the number of messages in the queue that have not yet been picked up by a
Read() command.
  • Loading branch information
philandstuff authored Dec 4, 2024
1 parent 3490956 commit 1682dcc
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 1 deletion.
12 changes: 12 additions & 0 deletions queue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ func (c *Client) Len(ctx context.Context, name string) (int64, error) {
return lenScript.RunRO(ctx, c.rdb, []string{name}).Int64()
}

// Lag calculates the aggregate lag (that is, number of messages that have not
// yet been read) of the consumer group for the given queue. It adds up the
// consumer group's lag of all the streams in the queue, based on the value
// reported by XINFO GROUPS for the stream. If any of the streams report a
// `nil` lag, Lag returns an error.
//
// As a side effect, Lag creates the consumer group if it does not already
// exist.
func (c *Client) Lag(ctx context.Context, queue string, group string) (int64, error) {
return lagScript.Run(ctx, c.rdb, []string{queue}, group).Int64()
}

// Read a single message from the queue. If the Block field of args is
// non-zero, the call may block for up to that duration waiting for a new
// message.
Expand Down
17 changes: 16 additions & 1 deletion queue/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ func TestClientIntegration(t *testing.T) {

ids := make(map[string]struct{})

for range 15 {
for i := range 15 {
lag, err := client.Lag(ctx, "test", "mygroup")
require.NoError(t, err)
assert.EqualValues(t, 15-i, lag)

msg, err := client.Read(ctx, &queue.ReadArgs{
Name: "test",
Group: "mygroup",
Expand All @@ -88,7 +92,18 @@ func TestClientIntegration(t *testing.T) {
Group: "mygroup",
Consumer: "mygroup:123",
})

require.ErrorIs(t, err, queue.Empty)

// Len remains 15 (because we haven't XDELed the messages or XTRIMed the stream)
length, err = client.Len(ctx, "test")
require.NoError(t, err)
assert.EqualValues(t, 15, length)

// But Lag is now 0 (because we've XREADGROUPed everything)
lag, err := client.Lag(ctx, "test", "mygroup")
require.NoError(t, err)
assert.EqualValues(t, 0, lag)
}

// Check that the Block option works as expected
Expand Down
46 changes: 46 additions & 0 deletions queue/lag.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-- Lag commands take the form
--
-- EVALSHA sha 1 key group
--
-- Note: strictly, it is illegal for a script to manipulate keys that are not
-- explicitly passed to EVAL{,SHA}, but in practice this is fine as long as all
-- keys are on the same server (e.g. in cluster scenarios). In our case a single
-- queue, which may be composed of multiple streams and metadata keys, is always
-- on the same server.

local base = KEYS[1]
local group = ARGV[1]

local key_meta = base .. ':meta'

local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1)
local result = 0

for idx = 0, streams-1 do
local stream = base .. ':s' .. idx

-- the group must exist for us to measure its lag. we create it here; if it
-- already exists this returns a BUSYGROUP error which we ignore
redis.pcall('XGROUP', 'CREATE', stream, group, '0', 'MKSTREAM')

local info = redis.pcall('XINFO', 'GROUPS', stream)
if info['err'] == 'ERR no such key' then
-- if the stream doesn't exist, treat it as zero lag
elseif info['err'] then
return redis.error_reply(info['err']..' accessing '..stream)
else
for i,v in ipairs(info) do
if v[2] == group then
if not v[12] then
-- lag can be nil; we propagate this to the caller
return redis.error_reply('ERR unknown lag for group '..group..' on stream '..stream)
end

result = result + v[12]
break
end
end
end
end

return result
7 changes: 7 additions & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ var (
lenCmd string
lenScript = redis.NewScript(lenCmd)

//go:embed lag.lua
lagCmd string
lagScript = redis.NewScript(lagCmd)

//go:embed read.lua
readCmd string
readScript = redis.NewScript(readCmd)
Expand All @@ -57,6 +61,9 @@ func prepare(ctx context.Context, rdb redis.Cmdable) error {
if err := lenScript.Load(ctx, rdb).Err(); err != nil {
return err
}
if err := lagScript.Load(ctx, rdb).Err(); err != nil {
return err
}
if err := readScript.Load(ctx, rdb).Err(); err != nil {
return err
}
Expand Down

0 comments on commit 1682dcc

Please sign in to comment.