Skip to content

Commit

Permalink
chore(tests): add basic tests for the stream consumer group (#2533)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanc-n authored Sep 17, 2024
1 parent 399961d commit 89c5b24
Showing 1 changed file with 108 additions and 0 deletions.
108 changes: 108 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,114 @@ func TestStreamOffset(t *testing.T) {
require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
})

t.Run("XINFO Test idle time and pending messages, for issue #2478", func(t *testing.T) {
streamName := "test-stream-2478"
groupName := "test-group-2478"
consumerName := "test-consumer-2478"

rdb.Del(ctx, streamName)
rdb.XGroupDestroy(ctx, streamName, groupName)

for i := 1; i <= 5; i++ {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: fmt.Sprintf("%d-0", i),
Values: map[string]interface{}{"field": fmt.Sprintf("value%d", i)},
}).Err())
}

require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())
r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 5,
}).Result()
require.NoError(t, err)
require.Len(t, r[0].Messages, 5)

time.Sleep(2 * time.Second)

consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

var consumerInfo redis.XInfoConsumer
for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.True(t, consumerInfo.Idle >= 2000)
require.Equal(t, int64(5), consumerInfo.Pending)

ackIDs := make([]string, 5)
for i := 1; i <= 5; i++ {
ackIDs[i-1] = fmt.Sprintf("%d-0", i)
}
require.NoError(t, rdb.XAck(ctx, streamName, groupName, ackIDs...).Err())

consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.Equal(t, int64(0), consumerInfo.Pending)
})

t.Run("XINFO Test consumer removal and inactive time, for issue #2478", func(t *testing.T) {
streamName := "stream-test-2478"
groupName := "group-test-2478"
consumerName := "consumer-test-2478"

rdb.Del(ctx, streamName)
rdb.XGroupDestroy(ctx, streamName, groupName)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "*",
Values: map[string]interface{}{"field": "value"},
}).Err())

require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 1,
}).Result()
require.NoError(t, err)

time.Sleep(500 * time.Millisecond)

consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

var consumerInfo redis.XInfoConsumer
for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.Equal(t, consumerName, consumerInfo.Name)
require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err())

consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

for _, c := range consumers {
require.NotEqual(t, consumerName, c.Name)
}
})

t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) {
streamName := "test-stream"
group := "group"
Expand Down

0 comments on commit 89c5b24

Please sign in to comment.