Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for XPENDING IDLE parameter #2822

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ Current package versions:
| [![StackExchange.Redis](https://img.shields.io/nuget/v/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis](https://img.shields.io/nuget/vpre/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis MyGet](https://img.shields.io/myget/stackoverflow/vpre/StackExchange.Redis.svg)](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) |

## Unreleased
No pending unreleased changes.

- Add support for XPENDING Idle time filter ([#2822 by david-brink-talogy](https://github.com/StackExchange/StackExchange.Redis/pull/2822))


## 2.8.22

Expand Down
20 changes: 18 additions & 2 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,21 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);

/// <summary>
/// View information about each pending message.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="count">The maximum number of pending messages to return.</param>
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
/// <param name="minId">The minimum ID from which to read the stream of pending messages. Pass null to read from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. Pass null to read to the end of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags);

/// <summary>
/// View information about each pending message.
/// </summary>
Expand All @@ -2658,11 +2673,12 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
/// <param name="minId">The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream.</param>
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
/// <remarks>Equivalent of calling XPENDING key group IDLE min-idle-time start-id end-id count consumer-name.</remarks>
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add an overload here, i.e. a second method with different parameters; otherwise, this is a hard binary break - we try very hard not to do that. If the compiler complains about two methods with optional parameters, we can work around that

(yes: technically adding methods to the interface is also problematic, but: it is problematic in different ways, and in reality we don't expect custom implementations of the IDatabase etc APIs)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this. I'd leave the existing public method signatures as-is, and add new methods with additional required params as needed.


/// <summary>
/// Read a stream using the given range of IDs.
Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,10 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, CommandFlags)"/>
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, long?, CommandFlags)"/>
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamRange(RedisKey, RedisValue?, RedisValue?, int?, Order, CommandFlags)"/>
Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,12 @@ public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupN
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingAsync(ToInner(key), groupName, flags);

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) =>
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);

public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,12 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPending(ToInner(key), groupName, flags);

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) =>
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);

public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);

Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key,
StackExchange.Redis.IDatabase.StreamInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamInfo
StackExchange.Redis.IDatabase.StreamLength(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
StackExchange.Redis.IDatabase.StreamPending(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingInfo
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamPendingMessageInfo[]!
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
Expand Down Expand Up @@ -971,7 +972,8 @@ StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.Redi
StackExchange.Redis.IDatabaseAsync.StreamInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamInfo>!
StackExchange.Redis.IDatabaseAsync.StreamLengthAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
StackExchange.Redis.IDatabaseAsync.StreamPendingAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingInfo>!
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
Expand Down
45 changes: 35 additions & 10 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2803,7 +2803,10 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group
return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo);
}

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
StreamPendingMessages(key, groupName, count, consumerName, minId, maxId, null, flags);

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(
key,
Expand All @@ -2812,12 +2815,16 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue
maxId,
count,
consumerName,
minIdleTimeInMs,
flags);

return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
}

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
StreamPendingMessagesAsync(key, groupName, count, consumerName, minId, maxId, null, flags);

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(
key,
Expand All @@ -2826,6 +2833,7 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key,
maxId,
count,
consumerName,
minIdleTimeInMs,
flags);

return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
Expand Down Expand Up @@ -4300,9 +4308,9 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro
/// Gets a message for <see href="https://redis.io/commands/xpending/"/>.
/// </summary>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags)
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, long? minIdleTimeInMs, CommandFlags flags)
{
// > XPENDING mystream mygroup - + 10 [consumer name]
// > XPENDING mystream mygroup [IDLE min-idle-time] - + 10 [consumer name]
// 1) 1) 1526569498055 - 0
// 2) "Bob"
// 3) (integer)74170458
Expand All @@ -4316,16 +4324,33 @@ private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupNa
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
}

var values = new RedisValue[consumerName == RedisValue.Null ? 4 : 5];
var valuesLength = 4;
if (consumerName != RedisValue.Null)
{
valuesLength++;
}

values[0] = groupName;
values[1] = minId ?? StreamConstants.ReadMinValue;
values[2] = maxId ?? StreamConstants.ReadMaxValue;
values[3] = count;
if (minIdleTimeInMs is not null)
{
valuesLength += 2;
}
var values = new RedisValue[valuesLength];

var offset = 0;

values[offset++] = groupName;
if (minIdleTimeInMs is not null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

{
values[offset++] = "IDLE";
values[offset++] = minIdleTimeInMs;
}
values[offset++] = minId ?? StreamConstants.ReadMinValue;
values[offset++] = maxId ?? StreamConstants.ReadMaxValue;
values[offset++] = count;

if (consumerName != RedisValue.Null)
{
values[4] = consumerName;
values[offset++] = consumerName;
}

return Message.Create(
Expand Down
Loading
Loading