Skip to content

Commit

Permalink
[queue] create PendingCount (to replace Lag) (#160)
Browse files Browse the repository at this point in the history
This adds a new PendingCount command which returns the aggregate number of
pending messages in a queue.

-----

I tried working with Lag but it has the problem that it is fundamentally
heuristic, based on keeping a track of two counters:

- how many messages have ever been added to the stream
- how many messages have ever been read by the given consumer group

"Lag" is then the difference between these counters - the number of messages
that have been added to the stream but not read by the consumer group.

Unfortunately, these counters can desync horribly if you ever XDEL a message
before it gets read by a consumer group.  This is because deleting a message
does not decrement the "added to stream" counter, nor does it increment the
"read by consumer group" counter.

Currently we rely on being able to XDEL messages.  Fixing that would be
nontrivial.

But we can measure the size of the PEL with XPENDING, and we can measure the
length of the stream with XLEN, so we can calculate lag as PendingCount() -
Len().
  • Loading branch information
philandstuff authored Dec 9, 2024
1 parent 1682dcc commit 6c095f7
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 69 deletions.
15 changes: 5 additions & 10 deletions queue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,11 @@ func (c *Client) Len(ctx context.Context, name string) (int64, error) {
return lenScript.RunRO(ctx, c.rdb, []string{name}).Int64()
}

// Lag calculates the aggregate lag (that is, number of messages that have not
// yet been read) of the consumer group for the given queue. It adds up the
// consumer group's lag of all the streams in the queue, based on the value
// reported by XINFO GROUPS for the stream. If any of the streams report a
// `nil` lag, Lag returns an error.
//
// As a side effect, Lag creates the consumer group if it does not already
// exist.
func (c *Client) Lag(ctx context.Context, queue string, group string) (int64, error) {
return lagScript.Run(ctx, c.rdb, []string{queue}, group).Int64()
// PendingCount counts the aggregate pending entries (that is, number of
// messages that have been read but not acknowledged) of the consumer group for
// the given queue, as reported by XPENDING.
func (c *Client) PendingCount(ctx context.Context, queue string, group string) (int64, error) {
return pendingCountScript.RunRO(ctx, c.rdb, []string{queue}, group).Int64()
}

// Read a single message from the queue. If the Block field of args is
Expand Down
13 changes: 4 additions & 9 deletions queue/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ func TestClientIntegration(t *testing.T) {
ids := make(map[string]struct{})

for i := range 15 {
lag, err := client.Lag(ctx, "test", "mygroup")
require.NoError(t, err)
assert.EqualValues(t, 15-i, lag)

msg, err := client.Read(ctx, &queue.ReadArgs{
Name: "test",
Group: "mygroup",
Expand All @@ -81,6 +77,10 @@ func TestClientIntegration(t *testing.T) {
assert.Contains(t, msg.Values, "type")
assert.Contains(t, msg.Values, "id")
ids[msg.Values["id"].(string)] = struct{}{}

pendingCount, err := client.PendingCount(ctx, "test", "mygroup")
require.NoError(t, err)
assert.EqualValues(t, i+1, pendingCount)
}

// We should have read all the messages we enqueued
Expand All @@ -99,11 +99,6 @@ func TestClientIntegration(t *testing.T) {
length, err = client.Len(ctx, "test")
require.NoError(t, err)
assert.EqualValues(t, 15, length)

// But Lag is now 0 (because we've XREADGROUPed everything)
lag, err := client.Lag(ctx, "test", "mygroup")
require.NoError(t, err)
assert.EqualValues(t, 0, lag)
}

// Check that the Block option works as expected
Expand Down
46 changes: 0 additions & 46 deletions queue/lag.lua

This file was deleted.

34 changes: 34 additions & 0 deletions queue/pendingcount.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- pendingcount commands take the form
--
-- EVALSHA sha 1 key group
--
-- Note: strictly, it is illegal for a script to manipulate keys that are not
-- explicitly passed to EVAL{,SHA}, but in practice this is fine as long as all
-- keys are on the same server (e.g. in cluster scenarios). In our case a single
-- queue, which may be composed of multiple streams and metadata keys, is always
-- on the same server.

local base = KEYS[1]
local group = ARGV[1]

local key_meta = base .. ':meta'

local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1)
local result = 0

for idx = 0, streams-1 do
local stream = base .. ':s' .. idx

local info = redis.pcall('XPENDING', stream, group)
if info['err'] then
if string.match(info['err'], '^NOGROUP ') then
-- if either the stream or group don't exist, there are zero pending entries
else
return redis.error_reply(info['err']..' accessing '..stream)
end
else
result = result + info[1]
end
end

return result
8 changes: 4 additions & 4 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ var (
lenCmd string
lenScript = redis.NewScript(lenCmd)

//go:embed lag.lua
lagCmd string
lagScript = redis.NewScript(lagCmd)
//go:embed pendingcount.lua
pendingCountCmd string
pendingCountScript = redis.NewScript(pendingCountCmd)

//go:embed read.lua
readCmd string
Expand All @@ -61,7 +61,7 @@ func prepare(ctx context.Context, rdb redis.Cmdable) error {
if err := lenScript.Load(ctx, rdb).Err(); err != nil {
return err
}
if err := lagScript.Load(ctx, rdb).Err(); err != nil {
if err := pendingCountScript.Load(ctx, rdb).Err(); err != nil {
return err
}
if err := readScript.Load(ctx, rdb).Err(); err != nil {
Expand Down

0 comments on commit 6c095f7

Please sign in to comment.