From 91b5478e8982d5814442def8d0850d0555cc69b2 Mon Sep 17 00:00:00 2001 From: Edward Xu <14014471+LindaSummer@users.noreply.github.com> Date: Fri, 15 Nov 2024 12:27:39 +0800 Subject: [PATCH] fix(stream): add `KeyRangeGen` for `XRead` and `XReadGroup` (#2657) Co-authored-by: Twice --- src/commands/cmd_stream.cc | 29 ++++- tests/gocase/unit/command/command_test.go | 134 ++++++++++++++++++++++ 2 files changed, 161 insertions(+), 2 deletions(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 0fe1cd3bccf..89650bda82b 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -33,6 +33,22 @@ #include "types/redis_stream.h" namespace redis { +namespace { +// for XRead and XReadGroup stream range parse. +CommandKeyRange ParseStreamReadRange(const std::vector &args, uint32_t start_offset) { + // assert here we must have a stream in args since it has been parsed. + auto stream_keyword_iter = std::find_if(std::next(args.cbegin(), start_offset), args.cend(), + [](const std::string &arg) { return util::EqualICase(arg, "streams"); }); + int stream_pos = static_cast(std::distance(args.cbegin(), stream_keyword_iter)); + int stream_size = static_cast(args.size() - stream_pos) / 2; + + CommandKeyRange range; + range.first_key = stream_pos + 1; + range.key_step = 1; + range.last_key = range.first_key + stream_size - 1; + return range; +} +} // namespace class CommandXAck : public Commander { public: @@ -1404,6 +1420,10 @@ class CommandXRead : public Commander, bufferevent_enable(bev, EV_READ); } + static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector &args) { + return ParseStreamReadRange(args, 0); + }; + private: std::vector streams_; std::vector ids_; @@ -1715,6 +1735,10 @@ class CommandXReadGroup : public Commander, bufferevent_enable(bev, EV_READ); } + static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector &args) { + return ParseStreamReadRange(args, 4); + }; + private: std::vector streams_; std::vector ids_; @@ -1896,8 +1920,9 @@ REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr("xack", -4, "write no-d MakeCmdAttr("xpending", -3, "read-only", 1, 1, 1), MakeCmdAttr("xrange", -4, "read-only", 1, 1, 1), MakeCmdAttr("xrevrange", -2, "read-only", 1, 1, 1), - MakeCmdAttr("xread", -4, "read-only blocking", NO_KEY), - MakeCmdAttr("xreadgroup", -7, "write blocking", NO_KEY), + MakeCmdAttr("xread", -4, "read-only blocking", CommandXRead::keyRangeGen), + MakeCmdAttr("xreadgroup", -7, "write blocking", + CommandXReadGroup::keyRangeGen), MakeCmdAttr("xtrim", -4, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xsetid", -3, "write", 1, 1, 1)) diff --git a/tests/gocase/unit/command/command_test.go b/tests/gocase/unit/command/command_test.go index d6f233fa520..27a6d5360ea 100644 --- a/tests/gocase/unit/command/command_test.go +++ b/tests/gocase/unit/command/command_test.go @@ -289,4 +289,138 @@ func TestCommand(t *testing.T) { require.Equal(t, "dst", vs[0]) require.Equal(t, "src", vs[1]) }) + + t.Run("COMMAND GETKEYS XREAD", func(t *testing.T) { + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", + "STREAMS", "k0-1", "k0-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "k0-1", vs[0]) + require.Equal(t, "k0-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", "COUNT", "10", + "STREAMS", "k1-1", "k1-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "k1-1", vs[0]) + require.Equal(t, "k1-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", "BLOCK", "1000", + "STREAMS", "k2-1", "k2-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "k2-1", vs[0]) + require.Equal(t, "k2-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", "COUNT", "10", + "BLOCK", "1000", "STREAMS", "k3-1", "k3-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "k3-1", vs[0]) + require.Equal(t, "k3-2", vs[1]) + } + }) + + t.Run("COMMAND GETKEYS XREADGROUP", func(t *testing.T) { + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1", + "STREAMS", "gk0-1", "gk0-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk0-1", vs[0]) + require.Equal(t, "gk0-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "streams", "streams", + "STREAMS", "gk1-1", "gk1-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk1-1", vs[0]) + require.Equal(t, "gk1-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1", + "COUNT", "10", "STREAMS", "gk3-1", "gk3-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk3-1", vs[0]) + require.Equal(t, "gk3-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1", + "BLOCK", "10", "STREAMS", "gk4-1", "gk4-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk4-1", vs[0]) + require.Equal(t, "gk4-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1", + "NOACK", "STREAMS", "gk5-1", "gk5-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk5-1", vs[0]) + require.Equal(t, "gk5-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1", + "COUNT", "10", "NOACK", "STREAMS", "gk6-1", "gk6-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk6-1", vs[0]) + require.Equal(t, "gk6-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1", + "BLOCK", "1000", "NOACK", "STREAMS", "gk7-1", "gk7-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk7-1", vs[0]) + require.Equal(t, "gk7-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1", + "COUNT", "10", "BLOCK", "1000", "STREAMS", "gk8-1", "gk8-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk8-1", vs[0]) + require.Equal(t, "gk8-2", vs[1]) + } + + { + r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1", + "COUNT", "10", "BLOCK", "1000", "NOACK", "STREAMS", "gk9-1", "gk9-2", "0-0", "0-0") + vs, err := r.Slice() + require.NoError(t, err) + require.Len(t, vs, 2) + require.Equal(t, "gk9-1", vs[0]) + require.Equal(t, "gk9-2", vs[1]) + } + }) }