From 84d57f6e44b7eb714e20a424a39fe8c9b5e5c9f8 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Fri, 17 Jan 2025 17:35:04 +0100 Subject: [PATCH] Redis Client: fix NPE when constructing XPendingSummary Certain fields that were deemed to be mandatory can in fact be `null`, as demonstrated in newly added tests. Also, this commit fixes translating `XTrimArgs` to the actual argument array in case `MAXLEN` is set to `0`. That is, in fact, a valid value, as demonstrated by the newly added tests and also by inspecting Redis source code [1]. [1] https://github.com/redis/redis/blob/7.2.4/src/t_stream.c#L941 --- .../datasource/stream/XPendingSummary.java | 6 +- .../redis/datasource/stream/XTrimArgs.java | 2 +- .../ReactiveStreamCommandsImpl.java | 10 +-- .../redis/datasource/StreamCommandsTest.java | 13 +++- .../TransactionalStreamCommandsTest.java | 69 +++++++++++++------ 5 files changed, 70 insertions(+), 30 deletions(-) diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java index ee61127ca9751..815dac30b4cfb 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java @@ -37,7 +37,7 @@ public long getPendingCount() { /** * Gets the lowest message id that was not yet acknowledged. * - * @return the lowest message id + * @return the lowest message id; may be {@code null} */ public String getLowestId() { return lowestId; @@ -46,7 +46,7 @@ public String getLowestId() { /** * Gets the highest message id that was not yet acknowledged. * - * @return the highest message id + * @return the highest message id; may be {@code null} */ public String getHighestId() { return highestId; @@ -56,7 +56,7 @@ public String getHighestId() { * Get the list of every consumer in the consumer group with at least one pending message, * and the number of pending messages it has. * - * @return the map composed of consumer -> number of message + * @return the map composed of consumer -> number of message; may be empty */ public Map getConsumers() { return consumers; diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XTrimArgs.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XTrimArgs.java index 3d33cc56a2e8e..cb96a3da49d7d 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XTrimArgs.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XTrimArgs.java @@ -66,7 +66,7 @@ public XTrimArgs limit(long limit) { public List toArgs() { List args = new ArrayList<>(); - if (maxlen > 0) { + if (maxlen >= 0) { if (minid != null) { throw new IllegalArgumentException("Cannot use `MAXLEN` and `MINID` together"); } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java index ca566c31b7dde..4ccba2e69cea0 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java @@ -370,12 +370,14 @@ protected XPendingSummary decodeAsXPendingSummary(Response r) { } var pending = r.get(0).toLong(); - var lowest = r.get(1).toString(); - var highest = r.get(2).toString(); + var lowest = r.get(1) != null ? r.get(1).toString() : null; + var highest = r.get(2) != null ? r.get(2).toString() : null; Map consumers = new HashMap<>(); - for (Response nested : r.get(3)) { - consumers.put(nested.get(0).toString(), nested.get(1).toLong()); + if (r.get(3) != null) { + for (Response nested : r.get(3)) { + consumers.put(nested.get(0).toString(), nested.get(1).toLong()); + } } return new XPendingSummary(pending, lowest, highest, consumers); diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java index 5958c37f56d51..d491f69587c12 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java @@ -648,11 +648,22 @@ void xGroupSetIdWithArgs() { @Test void xPendingSummaryTest() { Map payload = Map.of("sensor-id", 1234, "temperature", 19); + + stream.xadd(key, payload); + stream.xtrim(key, new XTrimArgs().maxlen(0)); + + stream.xgroupCreate(key, "my-group", "0-0"); + + XPendingSummary summaryEmpty = stream.xpending(key, "my-group"); + assertThat(summaryEmpty.getPendingCount()).isEqualTo(0); + assertThat(summaryEmpty.getHighestId()).isNull(); + assertThat(summaryEmpty.getLowestId()).isNull(); + assertThat(summaryEmpty.getConsumers()).isEmpty(); + for (int i = 0; i < 100; i++) { stream.xadd(key, payload); } - stream.xgroupCreate(key, "my-group", "0-0"); List> messages = stream.xreadgroup("my-group", "consumer-123", key, ">"); assertThat(messages).hasSize(100); diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java index df4d62e61a01c..dec431322a6fd 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java @@ -18,6 +18,7 @@ import io.quarkus.redis.datasource.stream.XAddArgs; import io.quarkus.redis.datasource.stream.XPendingArgs; import io.quarkus.redis.datasource.stream.XPendingSummary; +import io.quarkus.redis.datasource.stream.XTrimArgs; import io.quarkus.redis.datasource.transactions.TransactionResult; import io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl; import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl; @@ -48,31 +49,45 @@ public void streamBlocking() { TransactionalStreamCommands stream = tx.stream(String.class); assertThat(stream.getDataSource()).isEqualTo(tx); + stream.xadd(key, payload); + stream.xtrim(key, new XTrimArgs().maxlen(0)); + + stream.xgroupCreate(key, "g1", "0"); + + stream.xpending(key, "g1"); + stream.xadd(key, payload); stream.xadd(key, new XAddArgs().nomkstream(), payload); stream.xread(key, "0"); // 3 -> 2 messages - stream.xgroupCreate(key, "g1", "0"); stream.xreadgroup("g1", "c1", key, ">"); stream.xpending(key, "g1"); stream.xpending(key, "g1", StreamRange.of("-", "+"), 10, new XPendingArgs().consumer("c1")); }); - assertThat(result.size()).isEqualTo(7); + assertThat(result.size()).isEqualTo(10); assertThat(result.discarded()).isFalse(); + assertThat((String) result.get(0)).isNotBlank(); - assertThat((String) result.get(1)).isNotBlank(); + assertThat((Long) result.get(1)).isEqualTo(1); - String id1 = result.get(0); - String id2 = result.get(1); + assertThat(result. get(3).getPendingCount()).isEqualTo(0); + assertThat(result. get(3).getHighestId()).isNull(); + assertThat(result. get(3).getLowestId()).isNull(); + assertThat(result. get(3).getConsumers()).isEmpty(); - assertThat((List>) result.get(2)).hasSize(2); - assertThat((List>) result.get(4)).hasSize(2); + String id1 = result.get(4); + String id2 = result.get(5); - assertThat(((XPendingSummary) result.get(5)).getPendingCount()).isEqualTo(2); - List list = result.get(6); + assertThat((List>) result.get(6)).hasSize(2); + assertThat((List>) result.get(7)).hasSize(2); - assertThat(((List) result.get(6))).hasSize(2); + assertThat((result. get(8)).getPendingCount()).isEqualTo(2); + assertThat((result. get(8)).getHighestId()).isNotNull(); + assertThat((result. get(8)).getLowestId()).isNotNull(); + assertThat((result. get(8)).getConsumers()).hasSize(1); + List list = result.get(9); + assertThat(list).hasSize(2); List ids = list.stream().map(PendingMessage::getMessageId).collect(Collectors.toList()); assertThat(ids).containsExactly(id1, id2); } @@ -84,29 +99,41 @@ public void streamReactive() { assertThat(stream.getDataSource()).isEqualTo(tx); return stream.xadd(key, payload) - .chain((x) -> stream.xadd(key, new XAddArgs().nomkstream(), payload)) - .chain(x -> stream.xread(key, "0")) + .chain(x -> stream.xtrim(key, new XTrimArgs().maxlen(0))) .chain(x -> stream.xgroupCreate(key, "g1", "0")) + .chain(x -> stream.xpending(key, "g1")) + .chain(x -> stream.xadd(key, payload)) + .chain(x -> stream.xadd(key, new XAddArgs().nomkstream(), payload)) + .chain(x -> stream.xread(key, "0")) .chain(x -> stream.xreadgroup("g1", "c1", key, ">")) .chain(x -> stream.xpending(key, "g1")) .chain(x -> stream.xpending(key, "g1", StreamRange.of("-", "+"), 10)); }).await().indefinitely(); - assertThat(result.size()).isEqualTo(7); + assertThat(result.size()).isEqualTo(10); assertThat(result.discarded()).isFalse(); + assertThat((String) result.get(0)).isNotBlank(); - assertThat((String) result.get(1)).isNotBlank(); + assertThat((Long) result.get(1)).isEqualTo(1); + + assertThat(result. get(3).getPendingCount()).isEqualTo(0); + assertThat(result. get(3).getHighestId()).isNull(); + assertThat(result. get(3).getLowestId()).isNull(); + assertThat(result. get(3).getConsumers()).isEmpty(); - String id1 = result.get(0); - String id2 = result.get(1); + String id1 = result.get(4); + String id2 = result.get(5); - assertThat((List>) result.get(2)).hasSize(2); - assertThat((List>) result.get(4)).hasSize(2); + assertThat((List>) result.get(6)).hasSize(2); + assertThat((List>) result.get(7)).hasSize(2); - assertThat(((XPendingSummary) result.get(5)).getPendingCount()).isEqualTo(2); - List list = result.get(6); + assertThat((result. get(8)).getPendingCount()).isEqualTo(2); + assertThat((result. get(8)).getHighestId()).isNotNull(); + assertThat((result. get(8)).getLowestId()).isNotNull(); + assertThat((result. get(8)).getConsumers()).hasSize(1); - assertThat(((List) result.get(6))).hasSize(2); + List list = result.get(9); + assertThat(list).hasSize(2); List ids = list.stream().map(PendingMessage::getMessageId).collect(Collectors.toList()); assertThat(ids).containsExactly(id1, id2); }