Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a remote storage based on the Azure Table API #339

Merged
merged 9 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ jobs:
run: dotnet pack metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Streamiz.Kafka.Net.Metrics.Prometheus.csproj --configuration Release --no-build --no-restore
- name: Pack Metrics OpenTelemetry
run: dotnet pack metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/Streamiz.Kafka.Net.Metrics.OpenTelemetry.csproj --configuration Release --no-build --no-restore
- name: Pack Azure Remote Storage
run: dotnet pack remote/Streamiz.Kafka.Net.Azure.RemoteStorage/Streamiz.Kafka.Net.Azure.RemoteStorage.csproj --configuration Release --no-build --no-restore
- name: Publish in nuget.org
run: dotnet nuget push **/*.nupkg -k ${{ secrets.NUGET_PACKAGE_TOKEN }} -s https://api.nuget.org/v3/index.json -n -d
1 change: 1 addition & 0 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
[assembly: InternalsVisibleTo("Streamiz.Kafka.Net.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001000d9d4a8e90a3b987f68f047ec499e5a3405b46fcad30f52abadefca93b5ebce094d05976950b38cc7f0855f600047db0a351ede5e0b24b9d5f1de6c59ab55dee145da5d13bb86f7521b918c35c71ca5642fc46ba9b04d4900725a2d4813639ff47898e1b762ba4ccd5838e2dd1e1664bd72bf677d872c87749948b1174bd91ad")]
[assembly: InternalsVisibleTo("Streamiz.Kafka.Net.Metrics.Prometheus, PublicKey=00240000048000009400000006020000002400005253413100040000010001000d9d4a8e90a3b987f68f047ec499e5a3405b46fcad30f52abadefca93b5ebce094d05976950b38cc7f0855f600047db0a351ede5e0b24b9d5f1de6c59ab55dee145da5d13bb86f7521b918c35c71ca5642fc46ba9b04d4900725a2d4813639ff47898e1b762ba4ccd5838e2dd1e1664bd72bf677d872c87749948b1174bd91ad")]
[assembly: InternalsVisibleTo("Streamiz.Kafka.Net.Metrics.OpenTelemetry, PublicKey=00240000048000009400000006020000002400005253413100040000010001000d9d4a8e90a3b987f68f047ec499e5a3405b46fcad30f52abadefca93b5ebce094d05976950b38cc7f0855f600047db0a351ede5e0b24b9d5f1de6c59ab55dee145da5d13bb86f7521b918c35c71ca5642fc46ba9b04d4900725a2d4813639ff47898e1b762ba4ccd5838e2dd1e1664bd72bf677d872c87749948b1174bd91ad")]
[assembly: InternalsVisibleTo("Streamiz.Kafka.Net.Azure.RemoteStorage, PublicKey=00240000048000009400000006020000002400005253413100040000010001000d9d4a8e90a3b987f68f047ec499e5a3405b46fcad30f52abadefca93b5ebce094d05976950b38cc7f0855f600047db0a351ede5e0b24b9d5f1de6c59ab55dee145da5d13bb86f7521b918c35c71ca5642fc46ba9b04d4900725a2d4813639ff47898e1b762ba4ccd5838e2dd1e1664bd72bf677d872c87749948b1174bd91ad")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]

namespace Streamiz.Kafka.Net
Expand Down
1 change: 1 addition & 0 deletions core/Mock/Kafka/MockReadOnlyKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal class MockReadOnlyKeyValueStore<K, V> : IStateStore, IReadOnlyKeyValueS
public string Name => "MockStore";

public bool Persistent => false;
public bool IsLocally => true;

public bool IsOpen => true;

Expand Down
6 changes: 6 additions & 0 deletions core/Processors/IStateStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ public interface IStateStore
/// Return if the storage is persistent or not.
/// </summary>
bool Persistent { get; }

/// <summary>
/// Return if the storage is present locally or not.
/// If the store is local, an internal kafka topic will be created to offload the data
/// </summary>
bool IsLocally { get; }

/// <summary>
/// Is this store open for reading and writing
Expand Down
13 changes: 8 additions & 5 deletions core/Processors/Internal/ProcessorStateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal class StateStoreMetadata

public IDictionary<TopicPartition, long> ChangelogOffsets =>
registeredStores.Values
.Where(s => s.ChangelogTopicPartition != null)
.Where(s => s.ChangelogTopicPartition != null && s.Store.IsLocally)
.ToDictionary(s => s.ChangelogTopicPartition, s =>
{
if (s.Offset.HasValue)
Expand Down Expand Up @@ -81,8 +81,8 @@ public static string StoreChangelogTopic(string applicationId, String storeName)
private bool IsChangelogStateStore(string storeName)
=> changelogTopics.ContainsKey(storeName);

private TopicPartition GetStorePartition(string storeName)
=> new TopicPartition(changelogTopics[storeName], taskId.Partition);
private TopicPartition GetStorePartition(string storeName)
=> new(changelogTopics[storeName], taskId.Partition);

#region State Manager IMPL

Expand Down Expand Up @@ -200,7 +200,9 @@ public void Checkpoint()
IDictionary<TopicPartition, long> checkpointOffsets = new Dictionary<TopicPartition, long>();
foreach(var store in registeredStores)
{
if (store.Value.ChangelogTopicPartition != null && store.Value.Store.Persistent)
if (store.Value.ChangelogTopicPartition != null
&& store.Value.Store.Persistent
&& store.Value.Store.IsLocally)
{
checkpointOffsets.Add(store.Value.ChangelogTopicPartition, store.Value.Offset.HasValue ? store.Value.Offset.Value : OffsetCheckpointFile.OFFSET_UNKNOWN);
}
Expand All @@ -223,8 +225,9 @@ public void InitializeOffsetsFromCheckpoint()

foreach(var kvStore in registeredStores)
{
if(kvStore.Value.ChangelogTopicPartition == null)
if(kvStore.Value.ChangelogTopicPartition == null || !kvStore.Value.Store.IsLocally)
{
kvStore.Value.ChangelogTopicPartition = null;
log.LogInformation($"State store {kvStore.Value.Store.Name} is not logged and hence would not be restored");
}
else if (!kvStore.Value.Store.Persistent)
Expand Down
29 changes: 16 additions & 13 deletions core/Processors/Internal/StoreChangelogReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,23 @@ public void Register(TopicPartition topicPartition, ProcessorStateManager proces
if (storeMetadata == null)
throw new StreamsException($"Cannot find the corresponding state store metadata for changelog {topicPartition}");

var changelogMetadata = new ChangelogMetadata
if (storeMetadata.Store.IsLocally)
{
StoreMetadata = storeMetadata,
StateManager = processorStateManager,
ChangelogState = ChangelogState.REGISTERED,
RestoreEndOffset = null,
BeginOffset = null,
CurrentOffset = null,
TotalRestored = 0,
BufferedLimit = 0,
BufferedRecords = new List<ConsumeResult<byte[], byte[]>>()
};

changelogs.Add(topicPartition, changelogMetadata);
var changelogMetadata = new ChangelogMetadata
{
StoreMetadata = storeMetadata,
StateManager = processorStateManager,
ChangelogState = ChangelogState.REGISTERED,
RestoreEndOffset = null,
BeginOffset = null,
CurrentOffset = null,
TotalRestored = 0,
BufferedLimit = 0,
BufferedRecords = new List<ConsumeResult<byte[], byte[]>>()
};

changelogs.Add(topicPartition, changelogMetadata);
}
}

// Workflow :
Expand Down
2 changes: 1 addition & 1 deletion core/Processors/Internal/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ internal bool TryToCompleteRestoration()
var restored = changelogReader.CompletedChangelogs;
foreach(var task in activeTasksWithStateStore)
{
if (restored.Any() && task.ChangelogPartitions.ContainsAll(restored))
if (!task.ChangelogPartitions.Any() || (restored.Any() && task.ChangelogPartitions.ContainsAll(restored)))
task.CompleteRestoration();
else
allRunning = false;
Expand Down
1 change: 1 addition & 0 deletions core/State/Cache/Internal/MemoryCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal sealed class MemoryCache<K, V> : IMemoryCache<K, V>
where K : class, IComparable<K>
where V : class
{
// https://github.com/mkrebser/ConcurrentSortedDictionary
private readonly IComparer<K> _keyComparer;
private readonly IClockTime _clockTime;
internal readonly ILogger Logger;
Expand Down
5 changes: 5 additions & 0 deletions core/State/InMemory/InMemoryKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public InMemoryKeyValueStore(string name)
/// Return always false in <see cref="InMemoryKeyValueStore"/>
/// </summary>
public bool Persistent => false;

/// <summary>
/// Return always true
/// </summary>
public bool IsLocally => true;

/// <summary>
/// Is open
Expand Down
1 change: 1 addition & 0 deletions core/State/InMemory/InMemoryWindowStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public InMemoryWindowStore(string storeName, TimeSpan retention, long size, bool
public string Name { get; }

public bool Persistent => false;
public bool IsLocally => true;

public bool IsOpen { get; private set; } = false;

Expand Down
1 change: 1 addition & 0 deletions core/State/Internal/WrappedStateStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public WrappedStateStore(S wrapped)
public virtual string Name => wrapped.Name;

public virtual bool Persistent => wrapped.Persistent;
public virtual bool IsLocally => wrapped.IsLocally;

public virtual bool IsOpen => wrapped.IsOpen;

Expand Down
3 changes: 2 additions & 1 deletion core/State/KeyValueStoreBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public override IKeyValueStore<K, V> Build()

private IKeyValueStore<Bytes, byte[]> WrapLogging(IKeyValueStore<Bytes, byte[]> inner)
{
return !LoggingEnabled ? inner : new ChangeLoggingKeyValueBytesStore(inner);
return !LoggingEnabled || !inner.IsLocally ?
inner : new ChangeLoggingKeyValueBytesStore(inner);
}

private IKeyValueStore<Bytes, byte[]> WrapCaching(IKeyValueStore<Bytes, byte[]> inner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public AbstractRocksDBSegmentedBytesStore(
public string Name { get; }

public bool Persistent => true;
public bool IsLocally => true;

public bool IsOpen => isOpen;

Expand Down
5 changes: 5 additions & 0 deletions core/State/RocksDb/RocksDbKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public RocksDbKeyValueStore(string name, string parentDir)
/// Definitely True
/// </summary>
public bool Persistent => true;

/// <summary>
/// Definitely True
/// </summary>
public bool IsLocally => true;

/// <summary>
/// return if the state store is open or not
Expand Down
2 changes: 1 addition & 1 deletion core/State/TimestampedWindowStoreBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public override ITimestampedWindowStore<K, V> Build()

private IWindowStore<Bytes, byte[]> WrapLogging(IWindowStore<Bytes, byte[]> inner)
{
if (!LoggingEnabled)
if (!LoggingEnabled || !inner.IsLocally)
return inner;

return new ChangeLoggingTimestampedWindowBytesStore(inner, supplier.RetainDuplicates);
Expand Down
39 changes: 36 additions & 3 deletions core/StreamConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,21 @@ public int GetHashCode(KeyValuePair<string, string> obj)
private readonly IDictionary<string, PropertyInfo> cacheProperties
= new Dictionary<string, PropertyInfo>();

private readonly Dictionary<string, dynamic> configProperties
= new Dictionary<string, dynamic>();
private readonly Dictionary<string, dynamic> configProperties = new();

internal long MetricsMinIntervalMs { get; set; } = 30000;


/// <summary>
/// Get the config value of the key. Null if any key found OR
/// Add a new key/value configuration.
/// </summary>
/// <param name="key">key to get or add</param>
public dynamic this[string key]
{
get => Get(key);
set => AddConfig(key, value);
}

#region Middlewares

/// <summary>
Expand Down Expand Up @@ -3364,4 +3374,27 @@ public StreamConfig(IDictionary<string, dynamic> properties)
DefaultValueSerDes = new VS();
}
}


public static class StreamConfigExtensions
{
public static T Read<T>(this IStreamConfig config)
where T : class
{
T t = Activator.CreateInstance<T>();

foreach (var p in typeof(T).GetProperties())
{
var streamConfigAttr = p.GetCustomAttribute<StreamConfigPropertyAttribute>();
if (streamConfigAttr != null && !streamConfigAttr.ReadOnly)
{
var r = config.Get(streamConfigAttr.KeyName);
if (r != null)
p.SetValue(t, r);
}
}

return t;
}
}
}
Loading
Loading