diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 943127fcaaa..b50b230f6c0 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -1119,6 +1119,114 @@ func TestStreamOffset(t *testing.T) { require.Equal(t, msgID.ID, infoGroup.LastDeliveredID) }) + t.Run("XINFO Test idle time and pending messages, for issue #2478", func(t *testing.T) { + streamName := "test-stream-2478" + groupName := "test-group-2478" + consumerName := "test-consumer-2478" + + rdb.Del(ctx, streamName) + rdb.XGroupDestroy(ctx, streamName, groupName) + + for i := 1; i <= 5; i++ { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: fmt.Sprintf("%d-0", i), + Values: map[string]interface{}{"field": fmt.Sprintf("value%d", i)}, + }).Err()) + } + + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 5, + }).Result() + require.NoError(t, err) + require.Len(t, r[0].Messages, 5) + + time.Sleep(2 * time.Second) + + consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result() + require.NoError(t, err) + + var consumerInfo redis.XInfoConsumer + for _, c := range consumers { + if c.Name == consumerName { + consumerInfo = c + break + } + } + + require.True(t, consumerInfo.Idle >= 2000) + require.Equal(t, int64(5), consumerInfo.Pending) + + ackIDs := make([]string, 5) + for i := 1; i <= 5; i++ { + ackIDs[i-1] = fmt.Sprintf("%d-0", i) + } + require.NoError(t, rdb.XAck(ctx, streamName, groupName, ackIDs...).Err()) + + consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result() + require.NoError(t, err) + + for _, c := range consumers { + if c.Name == consumerName { + consumerInfo = c + break + } + } + + require.Equal(t, int64(0), consumerInfo.Pending) + }) + + t.Run("XINFO Test consumer removal and inactive time, for issue #2478", func(t *testing.T) { + streamName := "stream-test-2478" + groupName := "group-test-2478" + consumerName := "consumer-test-2478" + + rdb.Del(ctx, streamName) + rdb.XGroupDestroy(ctx, streamName, groupName) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: map[string]interface{}{"field": "value"}, + }).Err()) + + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) + + consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result() + require.NoError(t, err) + + var consumerInfo redis.XInfoConsumer + for _, c := range consumers { + if c.Name == consumerName { + consumerInfo = c + break + } + } + + require.Equal(t, consumerName, consumerInfo.Name) + require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err()) + + consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result() + require.NoError(t, err) + + for _, c := range consumers { + require.NotEqual(t, consumerName, c.Name) + } + }) + t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { streamName := "test-stream" group := "group"