From 02d966dd5a33018075ac70d151a3b6241082e810 Mon Sep 17 00:00:00 2001 From: Nick Stenning Date: Wed, 16 Oct 2024 13:28:14 +0200 Subject: [PATCH] Allow preferring a specified stream in queue.Read By default the queue client's Read method uses an approach to reading multi-stream queues that attempts to share the consumer resources fairly between all the streams. This commit adds a PreferStream field to queue.ReadArgs which, if it identifies a valid stream in the queue, is checked in preference to all other streams. If PreferStream is empty, or no messages can be found in PreferStream within the specified Block time, Read will fall back to checking all other streams in the queue. The client is expected to keep track of the value of Stream returned in the most recent Message and pass that back in as the value of PreferStream. This read behavior is most useful in the context of hotswaps, where each stream represents a specific workload (i.e. a specific value of "replicate_weights"). We can create "affinity" between a specific workload and a specific instance (or instances) of director by reading greedily from the same stream as much as possible. Co-authored-by: F --- queue/client.go | 72 +++++++++++++++++++++++++ queue/client_test.go | 123 +++++++++++++++++++++++++++++++++---------- queue/types.go | 9 ++-- 3 files changed, 172 insertions(+), 32 deletions(-) diff --git a/queue/client.go b/queue/client.go index 4fc781c..8e8768f 100644 --- a/queue/client.go +++ b/queue/client.go @@ -3,6 +3,7 @@ package queue import ( "context" "fmt" + "regexp" "strconv" "strings" "time" @@ -16,6 +17,8 @@ import ( var ( ErrInvalidReadArgs = fmt.Errorf("queue: invalid read arguments") ErrInvalidWriteArgs = fmt.Errorf("queue: invalid write arguments") + + streamSuffixPattern = regexp.MustCompile(`\A:s(\d+)\z`) ) type Client struct { @@ -61,9 +64,62 @@ func (c *Client) Read(ctx context.Context, args *ReadArgs) (*Message, error) { return nil, fmt.Errorf("%w: consumer cannot be empty", ErrInvalidReadArgs) } + if args.PreferStream != "" { + return c.readWithPreferredStream(ctx, args) + } return c.read(ctx, args) } +func (c *Client) readWithPreferredStream(ctx context.Context, args *ReadArgs) (*Message, error) { + // First we validate PreferStream. If it makes sense, we'll do an XREADGROUP + // against that stream. If it doesn't, we'll start things off with a normal + // round-robin read. + sid := strings.TrimPrefix(args.PreferStream, args.Name) + if ok := streamSuffixPattern.MatchString(sid); !ok { + return c.read(ctx, args) + } + + // go-redis defines the behavior for the zero value of Block as blocking + // indefinitely, which is the opposite of the default behavior of redis + // itself. Map 0 to -1 so we get non-blocking behavior if Block is not set. + // + // See: https://github.com/redis/go-redis/issues/1941 + block := args.Block + if block == 0 { + block = -1 + } + + result, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: args.Group, + Consumer: args.Consumer, + Streams: []string{args.PreferStream, ">"}, + Block: block, + Count: 1, + }).Result() + switch { + case err == redis.Nil: + // We try once more with a round-robin read if we got nothing from our start + // stream. + return c.readOnce(ctx, args) + case err != nil: + fmt.Printf("got err: %v\n", err) + return nil, err + } + + msg, err := parseXStreamSlice(result) + if err != nil { + return nil, err + } + + d, err := calculatePickupDelayFromID(msg.ID) + if err == nil { + span := trace.SpanFromContext(ctx) + span.SetAttributes(pickupDelay(d)) + } + + return msg, nil +} + func (c *Client) read(ctx context.Context, args *ReadArgs) (*Message, error) { result, err := c.readOnce(ctx, args) if result != nil || (err != nil && err != Empty) { @@ -234,6 +290,22 @@ func parse(v any) (*Message, error) { }, nil } +func parseXStreamSlice(streams []redis.XStream) (*Message, error) { + if len(streams) != 1 { + return nil, fmt.Errorf("must have single stream, got %d", len(streams)) + } + stream := streams[0] + if len(stream.Messages) != 1 { + return nil, fmt.Errorf("must have single message, got %d", len(stream.Messages)) + } + message := stream.Messages[0] + return &Message{ + Stream: stream.Stream, + ID: message.ID, + Values: message.Values, + }, nil +} + func parseSliceWithLength(v any, length int) ([]any, error) { slice, ok := v.([]any) if !ok { diff --git a/queue/client_test.go b/queue/client_test.go index 9a96cf1..87d204f 100644 --- a/queue/client_test.go +++ b/queue/client_test.go @@ -132,43 +132,110 @@ func TestClientBlockIntegration(t *testing.T) { } } +func messageOrderDefault(queues, messagesPerQueue int) []string { + // We expect to read one message from each stream in turn. + expected := make([]string, 0, queues*messagesPerQueue) + for message := range messagesPerQueue { + for queue := range queues { + expected = append(expected, fmt.Sprintf("%d-%d", queue, message)) + } + } + return expected +} + +func messageOrderPreferredStream(queues, messagesPerQueue int) []string { + // We expect to read all messages from queue 0 before moving on to queue 1, + // etc. + expected := make([]string, 0, queues*messagesPerQueue) + for queue := range queues { + for message := range messagesPerQueue { + expected = append(expected, fmt.Sprintf("%d-%d", queue, message)) + } + } + return expected +} + func TestClientReadIntegration(t *testing.T) { ctx := test.Context(t) rdb := test.Redis(ctx, t) - ttl := 24 * time.Hour - client := queue.NewClient(rdb, ttl) - require.NoError(t, client.Prepare(ctx)) + testcases := []struct { + Name string + Block time.Duration + TrackLastStream bool + ExpectFn func(queues, messagesPerQueue int) []string + }{ + { + Name: "Default (non-blocking)", + ExpectFn: messageOrderDefault, + }, + { + Name: "Default (blocking)", + Block: 10 * time.Millisecond, + ExpectFn: messageOrderDefault, + }, + { + Name: "PreferStream (non-blocking)", + TrackLastStream: true, + ExpectFn: messageOrderPreferredStream, + }, + { + Name: "PreferStream (blocking)", + Block: 10 * time.Millisecond, + TrackLastStream: true, + ExpectFn: messageOrderPreferredStream, + }, + } - // Prepare a queue with 4 streams - require.NoError(t, rdb.HSet(ctx, "myqueue:meta", "streams", 4).Err()) + for _, tc := range testcases { + t.Run(tc.Name, func(t *testing.T) { + queues := 4 + messagesPerQueue := 10 + ttl := 24 * time.Hour + client := queue.NewClient(rdb, ttl) + require.NoError(t, client.Prepare(ctx)) + + // Prepare a queue + require.NoError(t, rdb.HSet(ctx, "myqueue:meta", "streams", queues).Err()) + + for i := range queues { + for j := range messagesPerQueue { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: fmt.Sprintf("myqueue:s%d", i), + Values: map[string]any{ + "idx": fmt.Sprintf("%d-%d", i, j), + }, + }).Err()) + } + } - for i := range 4 { - for j := range 10 { - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: fmt.Sprintf("myqueue:s%d", i), - Values: map[string]any{ - "idx": fmt.Sprintf("%d-%d", i, j), - }, - }).Err()) - } - } + var lastStream string + msgs := make([]string, 0, queues*messagesPerQueue) + for { + readArgs := &queue.ReadArgs{ + Name: "myqueue", + Group: "mygroup", + Consumer: "mygroup:123", + Block: tc.Block, + } + if tc.TrackLastStream { + readArgs.PreferStream = lastStream + } + msg, err := client.Read(ctx, readArgs) + if errors.Is(err, queue.Empty) { + break + } + require.NoError(t, err) + lastStream = msg.Stream + msgs = append(msgs, msg.Values["idx"].(string)) + } - msgs := make(map[string]struct{}) - for { - msg, err := client.Read(ctx, &queue.ReadArgs{ - Name: "myqueue", - Group: "mygroup", - Consumer: "mygroup:123", + expected := tc.ExpectFn(queues, messagesPerQueue) + + assert.Len(t, msgs, queues*messagesPerQueue) + assert.EqualValues(t, expected, msgs) }) - if errors.Is(err, queue.Empty) { - break - } - require.NoError(t, err) - msgs[msg.Values["idx"].(string)] = struct{}{} } - - assert.Len(t, msgs, 40) } func TestClientReadLegacyStreamIntegration(t *testing.T) { diff --git a/queue/types.go b/queue/types.go index f53fbe3..a5c0105 100644 --- a/queue/types.go +++ b/queue/types.go @@ -24,10 +24,11 @@ type WriteArgs struct { } type ReadArgs struct { - Name string // queue name - Group string // consumer group name - Consumer string // consumer ID - Block time.Duration // total blocking time + Name string // queue name + Group string // consumer group name + Consumer string // consumer ID + PreferStream string // if specified, prefer reading from this stream + Block time.Duration // total blocking time } type Message struct {