Skip to content

Add xtrim with minid and new 8.2 stream features #2912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 21, 2025
1 change: 1 addition & 0 deletions StackExchange.Redis.sln
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
docs\ReleaseNotes.md = docs\ReleaseNotes.md
Shared.ruleset = Shared.ruleset
version.json = version.json
tests\RedisConfigs\docker-compose.yml = tests\RedisConfigs\docker-compose.yml
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RedisConfigs", "RedisConfigs", "{96E891CD-2ED7-4293-A7AB-4C6F5D8D2B05}"
Expand Down
2 changes: 2 additions & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Current package versions:
- Package updates ([#2906 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2906))
- Docs: added [guidance on async timeouts](https://stackexchange.github.io/StackExchange.Redis/AsyncTimeouts) ([#2910 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2910))
- Fix handshake error with `CLIENT ID` ([#2909 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2909))
- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842))
- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912))

## 2.8.41

Expand Down
4 changes: 4 additions & 0 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,12 @@ internal enum RedisCommand
WATCH,

XACK,
XACKDEL,
XADD,
XAUTOCLAIM,
XCLAIM,
XDEL,
XDELEX,
XGROUP,
XINFO,
XLEN,
Expand Down Expand Up @@ -496,9 +498,11 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.GEOADD:
case RedisCommand.SORT:
case RedisCommand.XACK:
case RedisCommand.XACKDEL:
case RedisCommand.XADD:
case RedisCommand.XCLAIM:
case RedisCommand.XDEL:
case RedisCommand.XDELEX:
case RedisCommand.XGROUP:
case RedisCommand.XREADGROUP:
case RedisCommand.XTRIM:
Expand Down
24 changes: 24 additions & 0 deletions src/StackExchange.Redis/Enums/StreamTrimMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace StackExchange.Redis;

/// <summary>
/// Determines how stream trimming works.
/// </summary>
public enum StreamTrimMode
{
/// <summary>
/// Trims the stream according to the specified policy (MAXLEN or MINID) regardless of whether entries are referenced by any consumer groups, but preserves existing references to these entries in all consumer groups' PEL.
/// </summary>
KeepReferences = 0,

/// <summary>
/// Trims the stream according to the specified policy and also removes all references to the trimmed entries from all consumer groups' PEL.
/// </summary>
/// <remarks>Requires server 8.2 or above.</remarks>
DeleteReferences = 1,

/// <summary>
/// With ACKED: Only trims entries that were read and acknowledged by all consumer groups.
/// </summary>
/// <remarks>Requires server 8.2 or above.</remarks>
Acknowledged = 2,
}
23 changes: 23 additions & 0 deletions src/StackExchange.Redis/Enums/StreamTrimResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace StackExchange.Redis;

/// <summary>
/// Determines how stream trimming works.
/// </summary>
public enum StreamTrimResult
{
/// <summary>
/// No such id exists in the provided stream key.
/// </summary>
NotFound = -1,

/// <summary>
/// Entry was deleted from the stream.
/// </summary>
Deleted = 1,

/// <summary>
/// Entry was not deleted, but there are still dangling references.
/// </summary>
/// <remarks>This response relates to the <see cref="StreamTrimMode.Acknowledged"/> mode.</remarks>
NotDeleted = 2,
}
114 changes: 111 additions & 3 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2440,6 +2440,34 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// <param name="mode">The delete mode to use when acknowledging the message.</param>
/// <param name="messageId">The ID of the message to acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The outcome of the delete operation.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
#pragma warning disable RS0026 // similar overloads
StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// /// <param name="mode">The delete mode to use when acknowledging the message.</param>
/// <param name="messageIds">The IDs of the messages to acknowledge.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The outcome of each delete operation.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
#pragma warning disable RS0026 // similar overloads
StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Adds an entry using the specified values to the given stream key.
/// If key does not exist, a new key holding a stream is created.
Expand All @@ -2454,7 +2482,7 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);

/// <summary>
/// Adds an entry using the specified values to the given stream key.
Expand All @@ -2469,7 +2497,46 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);

/// <summary>
/// Adds an entry using the specified values to the given stream key.
/// If key does not exist, a new key holding a stream is created.
/// The command returns the ID of the newly created stream entry.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="streamField">The field name for the stream entry.</param>
/// <param name="streamValue">The value to set in the stream entry.</param>
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
#pragma warning disable RS0026 // different shape
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Adds an entry using the specified values to the given stream key.
/// If key does not exist, a new key holding a stream is created.
/// The command returns the ID of the newly created stream entry.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="streamPairs">The fields and their associated values to set in the stream entry.</param>
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
#pragma warning disable RS0026 // different shape
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
Expand Down Expand Up @@ -2583,7 +2650,22 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns the number of messages successfully deleted from the stream.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
#pragma warning disable RS0026 // similar overloads
long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="messageIds">The IDs of the messages to delete.</param>
/// <param name="mode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns the number of messages successfully deleted from the stream.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
#pragma warning disable RS0026 // similar overloads
StreamTrimResult[] StreamDelete(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags = CommandFlags.None);
#pragma warning restore RS0026

/// <summary>
/// Delete a consumer from a consumer group.
Expand Down Expand Up @@ -2773,7 +2855,33 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages removed from the stream.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags);

/// <summary>
/// Trim the stream to a specified maximum length.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="maxLength">The maximum length of the stream.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
/// <param name="mode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages removed from the stream.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
long StreamTrim(RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Trim the stream to a specified minimum timestamp.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="minId">All entries with an id (timestamp) earlier minId will be removed.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed minId by a small number. This improves performance when removing messages.</param>
/// <param name="limit">The maximum number of entries to remove per call when useApproximateMaxLength = true. If 0, the limiting mechanism is disabled entirely.</param>
/// <param name="mode">Determines how stream trimming should be performed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages removed from the stream.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);

/// <summary>
/// If key already exists and is a string, this command appends the value at the end of the string.
Expand Down
Loading
Loading