Skip to content

Commit

Permalink
Allow preferring a specified stream in queue.Read
Browse files Browse the repository at this point in the history
By default the queue client's Read method uses an approach to reading
multi-stream queues that attempts to share the consumer resources fairly
between all the streams.

This commit adds a PreferStream field to queue.ReadArgs which, if it
identifies a valid stream in the queue, is checked in preference to all
other streams.

If PreferStream is empty, or no messages can be found in PreferStream
within the specified Block time, Read will fall back to checking all
other streams in the queue.

The client is expected to keep track of the value of Stream returned in
the most recent Message and pass that back in as the value of
PreferStream.

This read behavior is most useful in the context of hotswaps, where each
stream represents a specific workload (i.e. a specific value of
"replicate_weights"). We can create "affinity" between a specific
workload and a specific instance (or instances) of director by reading
greedily from the same stream as much as possible.

Co-authored-by: F <[email protected]>
  • Loading branch information
nickstenning and erbridge committed Oct 16, 2024
1 parent 5468067 commit 02d966d
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 32 deletions.
72 changes: 72 additions & 0 deletions queue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -16,6 +17,8 @@ import (
var (
ErrInvalidReadArgs = fmt.Errorf("queue: invalid read arguments")
ErrInvalidWriteArgs = fmt.Errorf("queue: invalid write arguments")

streamSuffixPattern = regexp.MustCompile(`\A:s(\d+)\z`)
)

type Client struct {
Expand Down Expand Up @@ -61,9 +64,62 @@ func (c *Client) Read(ctx context.Context, args *ReadArgs) (*Message, error) {
return nil, fmt.Errorf("%w: consumer cannot be empty", ErrInvalidReadArgs)
}

if args.PreferStream != "" {
return c.readWithPreferredStream(ctx, args)
}
return c.read(ctx, args)
}

func (c *Client) readWithPreferredStream(ctx context.Context, args *ReadArgs) (*Message, error) {
// First we validate PreferStream. If it makes sense, we'll do an XREADGROUP
// against that stream. If it doesn't, we'll start things off with a normal
// round-robin read.
sid := strings.TrimPrefix(args.PreferStream, args.Name)
if ok := streamSuffixPattern.MatchString(sid); !ok {
return c.read(ctx, args)
}

// go-redis defines the behavior for the zero value of Block as blocking
// indefinitely, which is the opposite of the default behavior of redis
// itself. Map 0 to -1 so we get non-blocking behavior if Block is not set.
//
// See: https://github.com/redis/go-redis/issues/1941
block := args.Block
if block == 0 {
block = -1
}

result, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: args.Group,
Consumer: args.Consumer,
Streams: []string{args.PreferStream, ">"},
Block: block,
Count: 1,
}).Result()
switch {
case err == redis.Nil:
// We try once more with a round-robin read if we got nothing from our start
// stream.
return c.readOnce(ctx, args)
case err != nil:
fmt.Printf("got err: %v\n", err)
return nil, err
}

msg, err := parseXStreamSlice(result)
if err != nil {
return nil, err
}

d, err := calculatePickupDelayFromID(msg.ID)
if err == nil {
span := trace.SpanFromContext(ctx)
span.SetAttributes(pickupDelay(d))
}

return msg, nil
}

func (c *Client) read(ctx context.Context, args *ReadArgs) (*Message, error) {
result, err := c.readOnce(ctx, args)
if result != nil || (err != nil && err != Empty) {
Expand Down Expand Up @@ -234,6 +290,22 @@ func parse(v any) (*Message, error) {
}, nil
}

func parseXStreamSlice(streams []redis.XStream) (*Message, error) {
if len(streams) != 1 {
return nil, fmt.Errorf("must have single stream, got %d", len(streams))
}
stream := streams[0]
if len(stream.Messages) != 1 {
return nil, fmt.Errorf("must have single message, got %d", len(stream.Messages))
}
message := stream.Messages[0]
return &Message{
Stream: stream.Stream,
ID: message.ID,
Values: message.Values,
}, nil
}

func parseSliceWithLength(v any, length int) ([]any, error) {
slice, ok := v.([]any)
if !ok {
Expand Down
123 changes: 95 additions & 28 deletions queue/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,43 +132,110 @@ func TestClientBlockIntegration(t *testing.T) {
}
}

func messageOrderDefault(queues, messagesPerQueue int) []string {
// We expect to read one message from each stream in turn.
expected := make([]string, 0, queues*messagesPerQueue)
for message := range messagesPerQueue {
for queue := range queues {
expected = append(expected, fmt.Sprintf("%d-%d", queue, message))
}
}
return expected
}

func messageOrderPreferredStream(queues, messagesPerQueue int) []string {
// We expect to read all messages from queue 0 before moving on to queue 1,
// etc.
expected := make([]string, 0, queues*messagesPerQueue)
for queue := range queues {
for message := range messagesPerQueue {
expected = append(expected, fmt.Sprintf("%d-%d", queue, message))
}
}
return expected
}

func TestClientReadIntegration(t *testing.T) {
ctx := test.Context(t)
rdb := test.Redis(ctx, t)

ttl := 24 * time.Hour
client := queue.NewClient(rdb, ttl)
require.NoError(t, client.Prepare(ctx))
testcases := []struct {
Name string
Block time.Duration
TrackLastStream bool
ExpectFn func(queues, messagesPerQueue int) []string
}{
{
Name: "Default (non-blocking)",
ExpectFn: messageOrderDefault,
},
{
Name: "Default (blocking)",
Block: 10 * time.Millisecond,
ExpectFn: messageOrderDefault,
},
{
Name: "PreferStream (non-blocking)",
TrackLastStream: true,
ExpectFn: messageOrderPreferredStream,
},
{
Name: "PreferStream (blocking)",
Block: 10 * time.Millisecond,
TrackLastStream: true,
ExpectFn: messageOrderPreferredStream,
},
}

// Prepare a queue with 4 streams
require.NoError(t, rdb.HSet(ctx, "myqueue:meta", "streams", 4).Err())
for _, tc := range testcases {
t.Run(tc.Name, func(t *testing.T) {
queues := 4
messagesPerQueue := 10
ttl := 24 * time.Hour
client := queue.NewClient(rdb, ttl)
require.NoError(t, client.Prepare(ctx))

// Prepare a queue
require.NoError(t, rdb.HSet(ctx, "myqueue:meta", "streams", queues).Err())

for i := range queues {
for j := range messagesPerQueue {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: fmt.Sprintf("myqueue:s%d", i),
Values: map[string]any{
"idx": fmt.Sprintf("%d-%d", i, j),
},
}).Err())
}
}

for i := range 4 {
for j := range 10 {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: fmt.Sprintf("myqueue:s%d", i),
Values: map[string]any{
"idx": fmt.Sprintf("%d-%d", i, j),
},
}).Err())
}
}
var lastStream string
msgs := make([]string, 0, queues*messagesPerQueue)
for {
readArgs := &queue.ReadArgs{
Name: "myqueue",
Group: "mygroup",
Consumer: "mygroup:123",
Block: tc.Block,
}
if tc.TrackLastStream {
readArgs.PreferStream = lastStream
}
msg, err := client.Read(ctx, readArgs)
if errors.Is(err, queue.Empty) {
break
}
require.NoError(t, err)
lastStream = msg.Stream
msgs = append(msgs, msg.Values["idx"].(string))
}

msgs := make(map[string]struct{})
for {
msg, err := client.Read(ctx, &queue.ReadArgs{
Name: "myqueue",
Group: "mygroup",
Consumer: "mygroup:123",
expected := tc.ExpectFn(queues, messagesPerQueue)

assert.Len(t, msgs, queues*messagesPerQueue)
assert.EqualValues(t, expected, msgs)
})
if errors.Is(err, queue.Empty) {
break
}
require.NoError(t, err)
msgs[msg.Values["idx"].(string)] = struct{}{}
}

assert.Len(t, msgs, 40)
}

func TestClientReadLegacyStreamIntegration(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions queue/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type WriteArgs struct {
}

type ReadArgs struct {
Name string // queue name
Group string // consumer group name
Consumer string // consumer ID
Block time.Duration // total blocking time
Name string // queue name
Group string // consumer group name
Consumer string // consumer ID
PreferStream string // if specified, prefer reading from this stream
Block time.Duration // total blocking time
}

type Message struct {
Expand Down

0 comments on commit 02d966d

Please sign in to comment.