Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Key/Value and Object Store compression #214

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public record NatsKVConfig
/// </summary>
public NatsKVRepublish? Republish { get; init; }

/// <summary>
/// Use compressed storage.
/// </summary>
public bool Compression { get; init; }

// TODO: Bucket mirror configuration.
// pub mirror: Option<Source>,
// Bucket sources configuration.
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.KeyValueStore/NatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public async ValueTask<INatsKVStore> CreateStoreAsync(NatsKVConfig config, Cance
MaxBytes = config.MaxBytes,
MaxAge = config.MaxAge.ToNanos(),
MaxMsgSize = config.MaxValueSize,
Compression = config.Compression ? StreamConfigurationCompression.s2 : StreamConfigurationCompression.none,
Storage = storage,
Republish = republish!,
AllowRollupHdrs = true,
Expand Down
5 changes: 3 additions & 2 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ public async IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsD
public async ValueTask<NatsKVStatus> GetStatusAsync(CancellationToken cancellationToken = default)
{
await _stream.RefreshAsync(cancellationToken);
return new NatsKVStatus(Bucket, _stream.Info);
var isCompressed = _stream.Info.Config.Compression != StreamConfigurationCompression.none;
return new NatsKVStatus(Bucket, isCompressed, _stream.Info);
}

/// <summary>
Expand Down Expand Up @@ -457,4 +458,4 @@ internal async ValueTask<NatsKVWatcher<T>> WatchInternalAsync<T>(string key, INa
}
}

public record NatsKVStatus(string Bucket, StreamInfo Info);
public record NatsKVStatus(string Bucket, bool IsCompressed, StreamInfo Info);
5 changes: 5 additions & 0 deletions src/NATS.Client.ObjectStore/NatsObjConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ public record NatsObjConfig(string Bucket)
/// Additional metadata for the bucket.
/// </summary>
public Dictionary<string, string>? Metadata { get; init; }

/// <summary>
/// Use compressed storage.
/// </summary>
public bool Compression { get; init; }
}
2 changes: 1 addition & 1 deletion src/NATS.Client.ObjectStore/NatsObjContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public async ValueTask<INatsObjStore> CreateObjectStore(NatsObjConfig config, Ca
AllowDirect = true,
Metadata = config.Metadata!,
Retention = StreamConfigurationRetention.limits,
Compression = StreamConfigurationCompression.none,
Compression = config.Compression ? StreamConfigurationCompression.s2 : StreamConfigurationCompression.none,
};

var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken);
Expand Down
5 changes: 3 additions & 2 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ public IAsyncEnumerable<ObjectMetadata> ListAsync(NatsObjListOpts? opts = defaul
public async ValueTask<NatsObjStatus> GetStatusAsync(CancellationToken cancellationToken = default)
{
await _stream.RefreshAsync(cancellationToken);
return new NatsObjStatus(Bucket, _stream.Info);
var isCompressed = _stream.Info.Config.Compression != StreamConfigurationCompression.none;
return new NatsObjStatus(Bucket, isCompressed, _stream.Info);
}

/// <summary>
Expand Down Expand Up @@ -678,4 +679,4 @@ public record NatsObjListOpts
public bool ShowDeleted { get; init; }
}

public record NatsObjStatus(string Bucket, StreamInfo Info);
public record NatsObjStatus(string Bucket, bool IsCompressed, StreamInfo Info);
29 changes: 29 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -522,4 +522,33 @@ public async Task Status()
Assert.Equal("KV_kv1", status.Info.Config.Name);
Assert.Equal(10, status.Info.Config.MaxMsgsPerSubject);
}

[SkipIfNatsServer(versionEarlierThan: "2.10")]
public async Task Compressed_storage()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var store1 = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { Compression = false }, cancellationToken: cancellationToken);
var store2 = await kv.CreateStoreAsync(new NatsKVConfig("kv2") { Compression = true }, cancellationToken: cancellationToken);

Assert.Equal("kv1", store1.Bucket);
Assert.Equal("kv2", store2.Bucket);

var status1 = await store1.GetStatusAsync(cancellationToken);
Assert.Equal("kv1", status1.Bucket);
Assert.Equal("KV_kv1", status1.Info.Config.Name);
Assert.Equal(StreamConfigurationCompression.none, status1.Info.Config.Compression);

var status2 = await store2.GetStatusAsync(cancellationToken);
Assert.Equal("kv2", status2.Bucket);
Assert.Equal("KV_kv2", status2.Info.Config.Name);
Assert.Equal(StreamConfigurationCompression.s2, status2.Info.Config.Compression);
}
}
30 changes: 29 additions & 1 deletion tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.IO.IsolatedStorage;
using System.Security.Cryptography;
using System.Text;
using NATS.Client.Core.Tests;
using NATS.Client.JetStream.Models;
using NATS.Client.ObjectStore.Internal;
using NATS.Client.ObjectStore.Models;

Expand Down Expand Up @@ -367,4 +367,32 @@ public async Task List()
Assert.True(infos[3].Deleted);
}
}

[SkipIfNatsServer(versionEarlierThan: "2.10")]
public async Task Compressed_storage()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var store1 = await obj.CreateObjectStore(new NatsObjConfig("b1") { Compression = false }, cancellationToken);
var store2 = await obj.CreateObjectStore(new NatsObjConfig("b2") { Compression = true }, cancellationToken);

Assert.Equal("b1", store1.Bucket);
Assert.Equal("b2", store2.Bucket);

var status1 = await store1.GetStatusAsync(cancellationToken);
Assert.Equal("b1", status1.Bucket);
Assert.Equal("OBJ_b1", status1.Info.Config.Name);
Assert.Equal(StreamConfigurationCompression.none, status1.Info.Config.Compression);

var status2 = await store2.GetStatusAsync(cancellationToken);
Assert.Equal("b2", status2.Bucket);
Assert.Equal("OBJ_b2", status2.Info.Config.Name);
Assert.Equal(StreamConfigurationCompression.s2, status2.Info.Config.Compression);
}
}
10 changes: 8 additions & 2 deletions tests/NATS.Client.TestUtilities/NatsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ public static class ServerVersions

public class NatsServer : IAsyncDisposable
{
public static readonly Version Version;

private static readonly string Ext = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? ".exe" : string.Empty;
private static readonly string NatsServerPath = $"nats-server{Ext}";
private static readonly Version Version;

private readonly string? _jetStreamStoreDir;
private readonly ITestOutputHelper _outputHelper;
Expand Down Expand Up @@ -550,11 +551,16 @@ public sealed class SkipIfNatsServer : FactAttribute

static SkipIfNatsServer() => SupportsTlsFirst = NatsServer.SupportsTlsFirst();

public SkipIfNatsServer(bool doesNotSupportTlsFirst = false)
public SkipIfNatsServer(bool doesNotSupportTlsFirst = false, string? versionEarlierThan = default)
{
if (doesNotSupportTlsFirst && !SupportsTlsFirst)
{
Skip = "NATS server doesn't support TLS first";
}

if (versionEarlierThan != null && new Version(versionEarlierThan) > NatsServer.Version)
{
Skip = $"NATS server version ({NatsServer.Version}) is earlier than {versionEarlierThan}";
}
}
}