diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index fa2773519..becb27486 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -8,6 +8,8 @@ Current package versions: ## Unreleased +- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972)) + ## 2.9.32 - Fix `SSUBSCRIBE` routing during slot migrations ([#2969 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2969)) diff --git a/src/StackExchange.Redis/APITypes/StreamEntry.cs b/src/StackExchange.Redis/APITypes/StreamEntry.cs index 3f37b9430..8337072e8 100644 --- a/src/StackExchange.Redis/APITypes/StreamEntry.cs +++ b/src/StackExchange.Redis/APITypes/StreamEntry.cs @@ -14,6 +14,19 @@ public StreamEntry(RedisValue id, NameValueEntry[] values) { Id = id; Values = values; + IdleTime = null; + DeliveryCount = 0; + } + + /// + /// Creates an stream entry. + /// + public StreamEntry(RedisValue id, NameValueEntry[] values, TimeSpan? idleTime, int deliveryCount) + { + Id = id; + Values = values; + IdleTime = idleTime; + DeliveryCount = deliveryCount; } /// @@ -51,6 +64,18 @@ public RedisValue this[RedisValue fieldName] } } + /// + /// Delivery count - the number of times this entry has been delivered: 0 for new messages that haven't been delivered before, + /// 1+ for claimed messages (previously unacknowledged entries). + /// + public int DeliveryCount { get; } + + /// + /// Idle time in milliseconds - the number of milliseconds elapsed since this entry was last delivered to a consumer. + /// + /// This member is populated when using XREADGROUP with CLAIM. + public TimeSpan? IdleTime { get; } + /// /// Indicates that the Redis Stream Entry is null. /// diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 6c52e89bd..9ee557fee 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -3004,7 +3004,25 @@ IEnumerable SortedSetScan( /// Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2. /// /// - RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None); + RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags); + + /// + /// Read from multiple streams into the given consumer group. + /// The consumer group with the given will need to have been created for each stream prior to calling this method. + /// + /// Array of streams and the positions from which to begin reading for each stream. + /// The name of the consumer group. + /// The name of the consumer. + /// The maximum number of messages to return from each stream. + /// When true, the message will not be added to the pending message list. + /// Auto-claim messages that have been idle for at least this long. + /// The flags to use for this operation. + /// A value of for each stream. + /// + /// Equivalent of calling XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2. + /// + /// + RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None); /// /// Trim the stream to a specified maximum length. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 0bc7b4867..f4101bca1 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -731,7 +731,10 @@ IAsyncEnumerable SortedSetScanAsync( Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags); /// - Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None); + Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags); + + /// + Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None); /// Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index 61a6f44c4..d1560de5f 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -696,6 +696,9 @@ public Task StreamReadGroupAsync(StreamPosition[] streamPositions public Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) => Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, flags); + public Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags); + public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 2a139694e..66c8f2c22 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -678,6 +678,9 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) => Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, flags); + public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags); + public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 0abb20043..6e825a873 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -746,7 +746,6 @@ StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, Stack StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]! StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamEntry[]! -StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]! StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisStream[]! StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> long StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long @@ -991,7 +990,6 @@ StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! @@ -2052,3 +2050,10 @@ StackExchange.Redis.IServer.ExecuteAsync(int? database, string! command, System. [SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByMember(StackExchange.Redis.RedisValue member) -> StackExchange.Redis.VectorSetSimilaritySearchRequest! [SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByVector(System.ReadOnlyMemory vector) -> StackExchange.Redis.VectorSetSimilaritySearchRequest! StackExchange.Redis.RedisChannel.WithKeyRouting() -> StackExchange.Redis.RedisChannel +StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, System.TimeSpan? claimMinIdleTime = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]! +StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, bool noAck, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisStream[]! +StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, System.TimeSpan? claimMinIdleTime = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, bool noAck, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! +StackExchange.Redis.StreamEntry.DeliveryCount.get -> int +StackExchange.Redis.StreamEntry.IdleTime.get -> System.TimeSpan? +StackExchange.Redis.StreamEntry.StreamEntry(StackExchange.Redis.RedisValue id, StackExchange.Redis.NameValueEntry[]! values, System.TimeSpan? idleTime, int deliveryCount) -> void diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index bcda4146b..82704e0c1 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -3319,17 +3319,26 @@ public Task StreamReadGroupAsync(RedisKey key, RedisValue groupNa } public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) - { - return StreamReadGroup( + => StreamReadGroup( streamPositions, groupName, consumerName, countPerStream, false, + null, + flags); + + public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags) + => StreamReadGroup( + streamPositions, + groupName, + consumerName, + countPerStream, + noAck, + null, flags); - } - public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) + public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) { var msg = GetMultiStreamReadGroupMessage( streamPositions, @@ -3337,23 +3346,33 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu consumerName, countPerStream, noAck, + claimMinIdleTime, flags); return ExecuteSync(msg, ResultProcessor.MultiStream, defaultValue: Array.Empty()); } public Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) - { - return StreamReadGroupAsync( + => StreamReadGroupAsync( streamPositions, groupName, consumerName, countPerStream, false, + null, + flags); + + public Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags) + => StreamReadGroupAsync( + streamPositions, + groupName, + consumerName, + countPerStream, + noAck, + null, flags); - } - public Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) + public Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) { var msg = GetMultiStreamReadGroupMessage( streamPositions, @@ -3361,6 +3380,7 @@ public Task StreamReadGroupAsync(StreamPosition[] streamPositions consumerName, countPerStream, noAck, + claimMinIdleTime, flags); return ExecuteAsync(msg, ResultProcessor.MultiStream, defaultValue: Array.Empty()); @@ -3992,7 +4012,7 @@ internal static RedisValue GetLexRange(RedisValue value, Exclude exclude, bool i return result; } - private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags) => + private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, TimeSpan? claimMinIdleTime, CommandFlags flags) => new MultiStreamReadGroupCommandMessage( Database, flags, @@ -4000,7 +4020,8 @@ private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, groupName, consumerName, countPerStream, - noAck); + noAck, + claimMinIdleTime); private sealed class MultiStreamReadGroupCommandMessage : Message // XREADGROUP with multiple stream. Example: XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2 { @@ -4010,8 +4031,9 @@ private sealed class MultiStreamReadGroupCommandMessage : Message // XREADGROUP private readonly int? countPerStream; private readonly bool noAck; private readonly int argCount; + private readonly TimeSpan? claimMinIdleTime; - public MultiStreamReadGroupCommandMessage(int db, CommandFlags flags, StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck) + public MultiStreamReadGroupCommandMessage(int db, CommandFlags flags, StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, TimeSpan? claimMinIdleTime) : base(db, flags, RedisCommand.XREADGROUP) { if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions)); @@ -4034,11 +4056,13 @@ public MultiStreamReadGroupCommandMessage(int db, CommandFlags flags, StreamPosi this.consumerName = consumerName; this.countPerStream = countPerStream; this.noAck = noAck; + this.claimMinIdleTime = claimMinIdleTime; argCount = 4 // Room for GROUP groupName consumerName & STREAMS + (streamPositions.Length * 2) // Enough room for the stream keys and associated IDs. + (countPerStream.HasValue ? 2 : 0) // Room for "COUNT num" or 0 if countPerStream is null. - + (noAck ? 1 : 0); // Allow for the NOACK subcommand. + + (noAck ? 1 : 0) // Allow for the NOACK subcommand. + + (claimMinIdleTime.HasValue ? 2 : 0); // Allow for the CLAIM {minIdleTime} subcommand. } public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) @@ -4069,6 +4093,12 @@ protected override void WriteImpl(PhysicalConnection physical) physical.WriteBulkString(StreamConstants.NoAck); } + if (claimMinIdleTime.HasValue) + { + physical.WriteBulkString(StreamConstants.Claim); + physical.WriteBulkString(claimMinIdleTime.Value.TotalMilliseconds); + } + physical.WriteBulkString(StreamConstants.Streams); for (int i = 0; i < streamPositions.Length; i++) { diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs index 87bcbf20c..1218a9f80 100644 --- a/src/StackExchange.Redis/RedisFeatures.cs +++ b/src/StackExchange.Redis/RedisFeatures.cs @@ -46,7 +46,8 @@ namespace StackExchange.Redis v7_4_0_rc1 = new Version(7, 3, 240), // 7.4 RC1 is version 7.3.240 v7_4_0_rc2 = new Version(7, 3, 241), // 7.4 RC2 is version 7.3.241 v8_0_0_M04 = new Version(7, 9, 227), // 8.0 M04 is version 7.9.227 - v8_2_0_rc1 = new Version(8, 1, 240); // 8.2 RC1 is version 8.1.240 + v8_2_0_rc1 = new Version(8, 1, 240), // 8.2 RC1 is version 8.1.240 + v8_4_0_rc1 = new Version(8, 3, 224); // 8.2 RC1 is version 8.3.224 #pragma warning restore SA1310 // Field names should not contain underscore #pragma warning restore SA1311 // Static readonly fields should begin with upper-case letter diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 650cba603..196cabde5 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -2215,6 +2215,8 @@ Multibulk array. 2) "Jane" 3) "surname" 4) "Austen" + + (note that XREADGROUP may include additional interior elements; see ParseRedisStreamEntries) */ protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) @@ -2683,11 +2685,25 @@ protected static StreamEntry ParseRedisStreamEntry(in RawResult item) // Process the Multibulk array for each entry. The entry contains the following elements: // [0] = SimpleString (the ID of the stream entry) // [1] = Multibulk array of the name/value pairs of the stream entry's data + // optional (XREADGROUP with CLAIM): + // [2] = idle time (in milliseconds) + // [3] = delivery count var entryDetails = item.GetItems(); + var id = entryDetails[0].AsRedisValue(); + var values = ParseStreamEntryValues(entryDetails[1]); + // check for optional fields (XREADGROUP with CLAIM) + if (entryDetails.Length >= 4 && entryDetails[2].TryGetInt64(out var idleTimeInMs) && entryDetails[3].TryGetInt64(out var deliveryCount)) + { + return new StreamEntry( + id: id, + values: values, + idleTime: TimeSpan.FromMilliseconds(idleTimeInMs), + deliveryCount: checked((int)deliveryCount)); + } return new StreamEntry( - id: entryDetails[0].AsRedisValue(), - values: ParseStreamEntryValues(entryDetails[1])); + id: id, + values: values); } protected internal StreamEntry[] ParseRedisStreamEntries(in RawResult result) => result.GetItems().ToArray((in RawResult item, in StreamProcessorBase _) => ParseRedisStreamEntry(item), this); diff --git a/src/StackExchange.Redis/StreamConstants.cs b/src/StackExchange.Redis/StreamConstants.cs index 929398e4b..1c0c11d00 100644 --- a/src/StackExchange.Redis/StreamConstants.cs +++ b/src/StackExchange.Redis/StreamConstants.cs @@ -66,6 +66,7 @@ internal static class StreamConstants internal static readonly RedisValue MkStream = "MKSTREAM"; internal static readonly RedisValue NoAck = "NOACK"; + internal static readonly RedisValue Claim = "CLAIM"; internal static readonly RedisValue Stream = "STREAM"; diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 58d2bb1fb..06a3599c9 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -495,8 +495,8 @@ public async Task StreamConsumerGroupSetId() var db = conn.GetDatabase(); var key = Me(); - const string groupName = "test_group", - consumer = "consumer"; + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + const string groupName = "test_group", consumer = "consumer"; // Create a stream db.StreamAdd(key, "field1", "value1"); @@ -519,6 +519,56 @@ public async Task StreamConsumerGroupSetId() Assert.Equal(2, secondRead.Length); } + [Fact] + public async Task StreamConsumerGroupAutoClaim() + { + await using var conn = Create(require: RedisFeatures.v8_4_0_rc1); + + var db = conn.GetDatabase(); + var key = Me(); + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + const string groupName = "test_group", consumer = "consumer"; + + // Create a group and set the position to deliver new messages only. + await db.StreamCreateConsumerGroupAsync(key, groupName, StreamPosition.NewMessages); + + // add some entries + await db.StreamAddAsync(key, "field1", "value1"); + await db.StreamAddAsync(key, "field2", "value2"); + + var idleTime = TimeSpan.FromMilliseconds(100); + // Read into the group, expect the two entries; we don't expect any data + // here, at least on a fast server, because it hasn't been idle long enough. + StreamPosition[] positions = [new(key, StreamPosition.NewMessages)]; + var groups = await db.StreamReadGroupAsync(positions, groupName, consumer, noAck: false, countPerStream: 10, claimMinIdleTime: idleTime); + var grp = Assert.Single(groups); + Assert.Equal(key, grp.Key); + Assert.Equal(2, grp.Entries.Length); + foreach (var entry in grp.Entries) + { + Assert.Equal(0, entry.DeliveryCount); // never delivered before + Assert.Equal(TimeSpan.Zero, entry.IdleTime); // never delivered before + } + + // now repeat immediately; we didn't "ack", so they're still pending, but not idle long enough + groups = await db.StreamReadGroupAsync(positions, groupName, consumer, noAck: false, countPerStream: 10, claimMinIdleTime: idleTime); + Assert.Empty(groups); // nothing available from any group + + // wait long enough for the messages to be considered idle + await Task.Delay(idleTime + idleTime); + + // repeat again; we should get the entries + groups = await db.StreamReadGroupAsync(positions, groupName, consumer, noAck: false, countPerStream: 10, claimMinIdleTime: idleTime); + grp = Assert.Single(groups); + Assert.Equal(key, grp.Key); + Assert.Equal(2, grp.Entries.Length); + foreach (var entry in grp.Entries) + { + Assert.Equal(1, entry.DeliveryCount); // this is a redelivery + Assert.True(entry.IdleTime > TimeSpan.Zero); // and is considered idle + } + } + [Fact] public async Task StreamConsumerGroupWithNoConsumers() {