Skip to content

Commit

Permalink
fix(stream): add KeyRangeGen for XRead and XReadGroup (#2657)
Browse files Browse the repository at this point in the history
Co-authored-by: Twice <[email protected]>
  • Loading branch information
LindaSummer and PragmaTwice authored Nov 15, 2024
1 parent 6747b8d commit 91b5478
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 2 deletions.
29 changes: 27 additions & 2 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@
#include "types/redis_stream.h"

namespace redis {
namespace {
// for XRead and XReadGroup stream range parse.
CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args, uint32_t start_offset) {
// assert here we must have a stream in args since it has been parsed.
auto stream_keyword_iter = std::find_if(std::next(args.cbegin(), start_offset), args.cend(),
[](const std::string &arg) { return util::EqualICase(arg, "streams"); });
int stream_pos = static_cast<int>(std::distance(args.cbegin(), stream_keyword_iter));
int stream_size = static_cast<int>(args.size() - stream_pos) / 2;

CommandKeyRange range;
range.first_key = stream_pos + 1;
range.key_step = 1;
range.last_key = range.first_key + stream_size - 1;
return range;
}
} // namespace

class CommandXAck : public Commander {
public:
Expand Down Expand Up @@ -1404,6 +1420,10 @@ class CommandXRead : public Commander,
bufferevent_enable(bev, EV_READ);
}

static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector<std::string> &args) {
return ParseStreamReadRange(args, 0);
};

private:
std::vector<std::string> streams_;
std::vector<StreamEntryID> ids_;
Expand Down Expand Up @@ -1715,6 +1735,10 @@ class CommandXReadGroup : public Commander,
bufferevent_enable(bev, EV_READ);
}

static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector<std::string> &args) {
return ParseStreamReadRange(args, 4);
};

private:
std::vector<std::string> streams_;
std::vector<StreamEntryID> ids_;
Expand Down Expand Up @@ -1896,8 +1920,9 @@ REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr<CommandXAck>("xack", -4, "write no-d
MakeCmdAttr<CommandXPending>("xpending", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRevRange>("xrevrange", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only blocking", NO_KEY),
MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, "write blocking", NO_KEY),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only blocking", CommandXRead::keyRangeGen),
MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, "write blocking",
CommandXReadGroup::keyRangeGen),
MakeCmdAttr<CommandXTrim>("xtrim", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1, 1, 1))

Expand Down
134 changes: 134 additions & 0 deletions tests/gocase/unit/command/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,138 @@ func TestCommand(t *testing.T) {
require.Equal(t, "dst", vs[0])
require.Equal(t, "src", vs[1])
})

t.Run("COMMAND GETKEYS XREAD", func(t *testing.T) {
{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD",
"STREAMS", "k0-1", "k0-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "k0-1", vs[0])
require.Equal(t, "k0-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", "COUNT", "10",
"STREAMS", "k1-1", "k1-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "k1-1", vs[0])
require.Equal(t, "k1-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", "BLOCK", "1000",
"STREAMS", "k2-1", "k2-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "k2-1", vs[0])
require.Equal(t, "k2-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", "COUNT", "10",
"BLOCK", "1000", "STREAMS", "k3-1", "k3-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "k3-1", vs[0])
require.Equal(t, "k3-2", vs[1])
}
})

t.Run("COMMAND GETKEYS XREADGROUP", func(t *testing.T) {
{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1",
"STREAMS", "gk0-1", "gk0-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk0-1", vs[0])
require.Equal(t, "gk0-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "streams", "streams",
"STREAMS", "gk1-1", "gk1-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk1-1", vs[0])
require.Equal(t, "gk1-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1",
"COUNT", "10", "STREAMS", "gk3-1", "gk3-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk3-1", vs[0])
require.Equal(t, "gk3-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1",
"BLOCK", "10", "STREAMS", "gk4-1", "gk4-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk4-1", vs[0])
require.Equal(t, "gk4-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1",
"NOACK", "STREAMS", "gk5-1", "gk5-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk5-1", vs[0])
require.Equal(t, "gk5-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1",
"COUNT", "10", "NOACK", "STREAMS", "gk6-1", "gk6-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk6-1", vs[0])
require.Equal(t, "gk6-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1",
"BLOCK", "1000", "NOACK", "STREAMS", "gk7-1", "gk7-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk7-1", vs[0])
require.Equal(t, "gk7-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1",
"COUNT", "10", "BLOCK", "1000", "STREAMS", "gk8-1", "gk8-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk8-1", vs[0])
require.Equal(t, "gk8-2", vs[1])
}

{
r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", "GROUP", "group1", "consumer1",
"COUNT", "10", "BLOCK", "1000", "NOACK", "STREAMS", "gk9-1", "gk9-2", "0-0", "0-0")
vs, err := r.Slice()
require.NoError(t, err)
require.Len(t, vs, 2)
require.Equal(t, "gk9-1", vs[0])
require.Equal(t, "gk9-2", vs[1])
}
})
}

0 comments on commit 91b5478

Please sign in to comment.