Skip to content

Commit

Permalink
Add filtering to KV keys methods (#907)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jul 8, 2024
1 parent 0633fb9 commit 26f64a9
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/NATS.Client/Internals/NatsConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ internal static class NatsConstants
internal const byte Colon = (byte)':';
internal const byte Cr = (byte)'\r';
internal const byte Lf = (byte)'\n';
internal const string GreaterThan = ">";

}
}
26 changes: 17 additions & 9 deletions src/NATS.Client/JetStream/FeatureBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using NATS.Client.Internals;

namespace NATS.Client.JetStream
Expand Down Expand Up @@ -74,20 +75,27 @@ internal MessageInfo _getBySeq(ulong sequence)
throw;
}
}

internal void VisitSubject(string subject, DeliverPolicy deliverPolicy, bool headersOnly, bool ordered, Action<Msg> action) {

internal void VisitSubject(string subject, DeliverPolicy deliverPolicy, bool headersOnly, bool ordered, Action<Msg> action)
{
VisitSubject(new []{subject}, deliverPolicy, headersOnly, ordered, action);
}

internal void VisitSubject(IList<string> subjects, DeliverPolicy deliverPolicy, bool headersOnly, bool ordered, Action<Msg> action)
{
ConsumerConfiguration.ConsumerConfigurationBuilder ccb = ConsumerConfiguration.Builder()
.WithAckPolicy(AckPolicy.None)
.WithDeliverPolicy(deliverPolicy)
.WithHeadersOnly(headersOnly)
.WithFilterSubjects(subjects);

PushSubscribeOptions pso = PushSubscribeOptions.Builder()
.WithStream(StreamName)
.WithOrdered(ordered)
.WithConfiguration(
ConsumerConfiguration.Builder()
.WithAckPolicy(AckPolicy.None)
.WithDeliverPolicy(deliverPolicy)
.WithHeadersOnly(headersOnly)
.Build())
.WithConfiguration(ccb.Build())
.Build();

IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(subject, pso);
IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(null, pso);
try
{
bool lastTimedOut = false;
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ private Boolean IsFilterMatch(string subscribeSubject, string filterSubject, str
return true;
}

if (string.IsNullOrWhiteSpace(filterSubject) || filterSubject.Equals(">")) {
if (string.IsNullOrWhiteSpace(filterSubject) || filterSubject.Equals(NatsConstants.GreaterThan)) {
// lookup stream subject returns null if there is not exactly one subject
string streamSubject = LookupStreamSubject(stream);
return subscribeSubject.Equals(streamSubject);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/JetStreamBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfigur
}

string fs = config.FilterSubject; // we've already determined not multiple so this gives us 1 or null
if (fs == null || fs.Equals(">"))
if (fs == null || fs.Equals(NatsConstants.GreaterThan))
{
subj = string.Format(JetStreamConstants.JsapiConsumerCreateV290, streamName, consumerName);
}
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/OrderedConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace NATS.Client.JetStream
{
public sealed class OrderedConsumerConfiguration
{
public const string DefaultFilterSubject = ">";
public const string DefaultFilterSubject = NatsConstants.GreaterThan;

public DeliverPolicy? DeliverPolicy { get; private set; }
public ulong StartSequence { get; private set; }
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/StreamInfoOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public StreamInfoOptionsBuilder WithFilterSubjects(string subjectsFilter)
}

public StreamInfoOptionsBuilder WithAllSubjects() {
_subjectsFilter = ">";
_subjectsFilter = NatsConstants.GreaterThan;
return this;
}

Expand Down
16 changes: 16 additions & 0 deletions src/NATS.Client/KeyValue/IKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ public interface IKeyValue
/// <returns>The list of keys</returns>
IList<string> Keys();

/// <summary>
/// Get a list of the keys in a bucket filtered by a
/// subject-like string, for instance "key" or "key.foo.*" or "key.&gt;"
/// </summary>
/// <param name="filter">the subject like key filter</param>
/// <returns>The list of keys</returns>
IList<string> Keys(string filter);

/// <summary>
/// Get a list of the keys in a bucket filtered by
/// subject-like strings, for instance "aaa.*", "bbb.*;"
/// </summary>
/// <param name="filters">the subject like key filters</param>
/// <returns>The list of keys</returns>
IList<string> Keys(IList<string> filters);

/// <summary>
/// Get the history (list of KeyValueEntry) for a key
/// </summary>
Expand Down
26 changes: 23 additions & 3 deletions src/NATS.Client/KeyValue/KeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,13 @@ public KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watc
public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, new List<string> {">"}, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {NatsConstants.GreaterThan}, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, new List<string> {">"}, watcher, fromRevision, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {NatsConstants.GreaterThan}, watcher, fromRevision, watchOptions);
}

private PublishAck _write(string key, byte[] data, MsgHeader h) {
Expand All @@ -220,9 +220,29 @@ private PublishAck _write(string key, byte[] data, MsgHeader h) {
}

public IList<string> Keys()
{
return _keys(new []{ReadSubject(NatsConstants.GreaterThan)});
}

public IList<string> Keys(string filter)
{
return _keys(new []{ReadSubject(filter)});
}

public IList<string> Keys(IList<string> filters)
{
IList<string> readSubjectFilters = new List<string>(filters.Count);
foreach (string f in filters)
{
readSubjectFilters.Add(ReadSubject(f));
}
return _keys(readSubjectFilters);
}

internal IList<string> _keys(IList<string> readSubjectFilters)
{
IList<string> list = new List<string>();
VisitSubject(ReadSubject(">"), DeliverPolicy.LastPerSubject, true, false, m => {
VisitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m => {
KeyValueOperation op = GetOperation(m.Header, KeyValueOperation.Put);
if (op.Equals(KeyValueOperation.Put)) {
list.Add(new BucketAndKey(m).Key);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/ObjectStore/ObjectStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal string RawMetaSubject(string name) {

internal string RawAllMetaSubject()
{
return RawMetaPrefix + ">";
return RawMetaPrefix + NatsConstants.GreaterThan;
}

internal string PubSubMetaSubject(string name) {
Expand Down
33 changes: 30 additions & 3 deletions src/Tests/IntegrationTests/TestKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public void TestWorkFlow()
{
DateTime utcNow = DateTime.UtcNow;

string byteKey = "byteKey";
string stringKey = "stringKey";
string longKey = "longKey";
string byteKey = "key.byte" + Variant();
string stringKey = "key.string" + Variant();
string longKey = "key.long" + Variant();
string notFoundKey = "notFound";
string byteValue1 = "Byte Value 1";
string byteValue2 = "Byte Value 2";
Expand Down Expand Up @@ -213,6 +213,9 @@ public void TestWorkFlow()

// should have exactly these 3 keys
AssertKeys(kv.Keys(), byteKey, stringKey, longKey);
AssertKeys(kv.Keys("key.>"), byteKey, stringKey, longKey);
AssertKeys(kv.Keys(byteKey), byteKey);
AssertKeys(kv.Keys(new[]{longKey, stringKey}), longKey, stringKey);

// purge
kv.Purge(longKey);
Expand Down Expand Up @@ -1493,6 +1496,30 @@ public void TestKeyValueTransform() {
// Assert.Null(kv2.Get(key2));
});
}


[Fact]
public void TestSubjectFiltersAgainst209OptOut()
{
Context.RunInJsServer(c =>
{
IKeyValueManagement kvm = c.CreateKeyValueManagementContext();

string bucket = Bucket();
kvm.Create(KeyValueConfiguration.Builder()
.WithName(bucket)
.WithStorageType(StorageType.Memory)
.Build());

JetStreamOptions jso = JetStreamOptions.Builder().WithOptOut290ConsumerCreate(true).Build();
KeyValueOptions kvo = KeyValueOptions.Builder().WithJetStreamOptions(jso).Build();
IKeyValue kv = c.CreateKeyValueContext(bucket, kvo);
kv.Put("one", 1);
kv.Put("two", 2);
AssertKeys(kv.Keys(new []{"one", "two"}), "one", "two");
});
}

}

class TestKeyValueWatcher : IKeyValueWatcher
Expand Down

0 comments on commit 26f64a9

Please sign in to comment.