Skip to content

Commit

Permalink
Redis Client: fix NPE when constructing XPendingSummary
Browse files Browse the repository at this point in the history
Certain fields that were deemed to be mandatory can in fact be `null`,
as demonstrated in newly added tests.

Also, this commit fixes translating `XTrimArgs` to the actual argument
array in case `MAXLEN` is set to `0`. That is, in fact, a valid value,
as demonstrated by the newly added tests and also by inspecting Redis
source code [1].

[1] https://github.com/redis/redis/blob/7.2.4/src/t_stream.c#L941
  • Loading branch information
Ladicek committed Jan 17, 2025
1 parent a09ea63 commit 84d57f6
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public long getPendingCount() {
/**
* Gets the lowest message id that was not yet acknowledged.
*
* @return the lowest message id
* @return the lowest message id; may be {@code null}
*/
public String getLowestId() {
return lowestId;
Expand All @@ -46,7 +46,7 @@ public String getLowestId() {
/**
* Gets the highest message id that was not yet acknowledged.
*
* @return the highest message id
* @return the highest message id; may be {@code null}
*/
public String getHighestId() {
return highestId;
Expand All @@ -56,7 +56,7 @@ public String getHighestId() {
* Get the list of every consumer in the consumer group with at least one pending message,
* and the number of pending messages it has.
*
* @return the map composed of consumer -> number of message
* @return the map composed of consumer -> number of message; may be empty
*/
public Map<String, Long> getConsumers() {
return consumers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public XTrimArgs limit(long limit) {
public List<Object> toArgs() {
List<Object> args = new ArrayList<>();

if (maxlen > 0) {
if (maxlen >= 0) {
if (minid != null) {
throw new IllegalArgumentException("Cannot use `MAXLEN` and `MINID` together");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,14 @@ protected XPendingSummary decodeAsXPendingSummary(Response r) {
}

var pending = r.get(0).toLong();
var lowest = r.get(1).toString();
var highest = r.get(2).toString();
var lowest = r.get(1) != null ? r.get(1).toString() : null;
var highest = r.get(2) != null ? r.get(2).toString() : null;

Map<String, Long> consumers = new HashMap<>();
for (Response nested : r.get(3)) {
consumers.put(nested.get(0).toString(), nested.get(1).toLong());
if (r.get(3) != null) {
for (Response nested : r.get(3)) {
consumers.put(nested.get(0).toString(), nested.get(1).toLong());
}
}

return new XPendingSummary(pending, lowest, highest, consumers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,11 +648,22 @@ void xGroupSetIdWithArgs() {
@Test
void xPendingSummaryTest() {
Map<String, Integer> payload = Map.of("sensor-id", 1234, "temperature", 19);

stream.xadd(key, payload);
stream.xtrim(key, new XTrimArgs().maxlen(0));

stream.xgroupCreate(key, "my-group", "0-0");

XPendingSummary summaryEmpty = stream.xpending(key, "my-group");
assertThat(summaryEmpty.getPendingCount()).isEqualTo(0);
assertThat(summaryEmpty.getHighestId()).isNull();
assertThat(summaryEmpty.getLowestId()).isNull();
assertThat(summaryEmpty.getConsumers()).isEmpty();

for (int i = 0; i < 100; i++) {
stream.xadd(key, payload);
}

stream.xgroupCreate(key, "my-group", "0-0");
List<StreamMessage<String, String, Integer>> messages = stream.xreadgroup("my-group", "consumer-123", key, ">");
assertThat(messages).hasSize(100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.quarkus.redis.datasource.stream.XAddArgs;
import io.quarkus.redis.datasource.stream.XPendingArgs;
import io.quarkus.redis.datasource.stream.XPendingSummary;
import io.quarkus.redis.datasource.stream.XTrimArgs;
import io.quarkus.redis.datasource.transactions.TransactionResult;
import io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl;
import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl;
Expand Down Expand Up @@ -48,31 +49,45 @@ public void streamBlocking() {
TransactionalStreamCommands<String, String, String> stream = tx.stream(String.class);
assertThat(stream.getDataSource()).isEqualTo(tx);

stream.xadd(key, payload);
stream.xtrim(key, new XTrimArgs().maxlen(0));

stream.xgroupCreate(key, "g1", "0");

stream.xpending(key, "g1");

stream.xadd(key, payload);
stream.xadd(key, new XAddArgs().nomkstream(), payload);

stream.xread(key, "0"); // 3 -> 2 messages
stream.xgroupCreate(key, "g1", "0");
stream.xreadgroup("g1", "c1", key, ">");
stream.xpending(key, "g1");
stream.xpending(key, "g1", StreamRange.of("-", "+"), 10, new XPendingArgs().consumer("c1"));

});
assertThat(result.size()).isEqualTo(7);
assertThat(result.size()).isEqualTo(10);
assertThat(result.discarded()).isFalse();

assertThat((String) result.get(0)).isNotBlank();
assertThat((String) result.get(1)).isNotBlank();
assertThat((Long) result.get(1)).isEqualTo(1);

String id1 = result.get(0);
String id2 = result.get(1);
assertThat(result.<XPendingSummary> get(3).getPendingCount()).isEqualTo(0);
assertThat(result.<XPendingSummary> get(3).getHighestId()).isNull();
assertThat(result.<XPendingSummary> get(3).getLowestId()).isNull();
assertThat(result.<XPendingSummary> get(3).getConsumers()).isEmpty();

assertThat((List<StreamMessage<String, String, String>>) result.get(2)).hasSize(2);
assertThat((List<StreamMessage<String, String, String>>) result.get(4)).hasSize(2);
String id1 = result.get(4);
String id2 = result.get(5);

assertThat(((XPendingSummary) result.get(5)).getPendingCount()).isEqualTo(2);
List<PendingMessage> list = result.get(6);
assertThat((List<StreamMessage<String, String, String>>) result.get(6)).hasSize(2);
assertThat((List<StreamMessage<String, String, String>>) result.get(7)).hasSize(2);

assertThat(((List<PendingMessage>) result.get(6))).hasSize(2);
assertThat((result.<XPendingSummary> get(8)).getPendingCount()).isEqualTo(2);
assertThat((result.<XPendingSummary> get(8)).getHighestId()).isNotNull();
assertThat((result.<XPendingSummary> get(8)).getLowestId()).isNotNull();
assertThat((result.<XPendingSummary> get(8)).getConsumers()).hasSize(1);
List<PendingMessage> list = result.get(9);
assertThat(list).hasSize(2);
List<String> ids = list.stream().map(PendingMessage::getMessageId).collect(Collectors.toList());
assertThat(ids).containsExactly(id1, id2);
}
Expand All @@ -84,29 +99,41 @@ public void streamReactive() {
assertThat(stream.getDataSource()).isEqualTo(tx);

return stream.xadd(key, payload)
.chain((x) -> stream.xadd(key, new XAddArgs().nomkstream(), payload))
.chain(x -> stream.xread(key, "0"))
.chain(x -> stream.xtrim(key, new XTrimArgs().maxlen(0)))
.chain(x -> stream.xgroupCreate(key, "g1", "0"))
.chain(x -> stream.xpending(key, "g1"))
.chain(x -> stream.xadd(key, payload))
.chain(x -> stream.xadd(key, new XAddArgs().nomkstream(), payload))
.chain(x -> stream.xread(key, "0"))
.chain(x -> stream.xreadgroup("g1", "c1", key, ">"))
.chain(x -> stream.xpending(key, "g1"))
.chain(x -> stream.xpending(key, "g1", StreamRange.of("-", "+"), 10));

}).await().indefinitely();
assertThat(result.size()).isEqualTo(7);
assertThat(result.size()).isEqualTo(10);
assertThat(result.discarded()).isFalse();

assertThat((String) result.get(0)).isNotBlank();
assertThat((String) result.get(1)).isNotBlank();
assertThat((Long) result.get(1)).isEqualTo(1);

assertThat(result.<XPendingSummary> get(3).getPendingCount()).isEqualTo(0);
assertThat(result.<XPendingSummary> get(3).getHighestId()).isNull();
assertThat(result.<XPendingSummary> get(3).getLowestId()).isNull();
assertThat(result.<XPendingSummary> get(3).getConsumers()).isEmpty();

String id1 = result.get(0);
String id2 = result.get(1);
String id1 = result.get(4);
String id2 = result.get(5);

assertThat((List<StreamMessage<String, String, String>>) result.get(2)).hasSize(2);
assertThat((List<StreamMessage<String, String, String>>) result.get(4)).hasSize(2);
assertThat((List<StreamMessage<String, String, String>>) result.get(6)).hasSize(2);
assertThat((List<StreamMessage<String, String, String>>) result.get(7)).hasSize(2);

assertThat(((XPendingSummary) result.get(5)).getPendingCount()).isEqualTo(2);
List<PendingMessage> list = result.get(6);
assertThat((result.<XPendingSummary> get(8)).getPendingCount()).isEqualTo(2);
assertThat((result.<XPendingSummary> get(8)).getHighestId()).isNotNull();
assertThat((result.<XPendingSummary> get(8)).getLowestId()).isNotNull();
assertThat((result.<XPendingSummary> get(8)).getConsumers()).hasSize(1);

assertThat(((List<PendingMessage>) result.get(6))).hasSize(2);
List<PendingMessage> list = result.get(9);
assertThat(list).hasSize(2);
List<String> ids = list.stream().map(PendingMessage::getMessageId).collect(Collectors.toList());
assertThat(ids).containsExactly(id1, id2);
}
Expand Down

0 comments on commit 84d57f6

Please sign in to comment.