diff --git a/core/State/InMemory/InMemoryWindowStore.cs b/core/State/InMemory/InMemoryWindowStore.cs index 997c9401..fb981db5 100644 --- a/core/State/InMemory/InMemoryWindowStore.cs +++ b/core/State/InMemory/InMemoryWindowStore.cs @@ -12,6 +12,7 @@ using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Internal; using Streamiz.Kafka.Net.State.Helper; +using Streamiz.Kafka.Net.State.InMemory.Internal; namespace Streamiz.Kafka.Net.State.InMemory { @@ -260,7 +261,8 @@ public Windowed PeekNextKey() #endregion - internal class InMemoryWindowStore : IWindowStore + internal class + InMemoryWindowStore : IWindowStore { private readonly TimeSpan retention; private readonly long size; @@ -272,8 +274,7 @@ internal class InMemoryWindowStore : IWindowStore private int seqnum = 0; private readonly ConcurrentDictionary> map = new(); - - private readonly ISet openIterators = new HashSet(); + private readonly ConcurrentSet openIterators = new(); private readonly ILogger logger = Logger.GetLogger(typeof(InMemoryWindowStore)); @@ -309,8 +310,9 @@ public void Close() if (openIterators.Count != 0) { logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}", openIterators.Count, Name); - for (int i = 0; i< openIterators.Count; ++i) - openIterators.ElementAt(i).Close(); + foreach(var iterator in openIterators) + iterator.Close(); + openIterators.Clear(); } map.Clear(); diff --git a/core/State/InMemory/Internal/ConcurrentHashSet.cs b/core/State/InMemory/Internal/ConcurrentHashSet.cs new file mode 100644 index 00000000..e898b396 --- /dev/null +++ b/core/State/InMemory/Internal/ConcurrentHashSet.cs @@ -0,0 +1,71 @@ +using System.Collections.Concurrent; +using System.Collections.Generic; + +namespace Streamiz.Kafka.Net.State.InMemory.Internal +{ + internal class ConcurrentSet + { + private readonly ConcurrentDictionary _dictionary = new(); + + /// + /// Returns an enumerator that iterates through the collection. + /// + /// + /// A that can be used to iterate through the collection. + /// + public IEnumerator GetEnumerator() + { + return _dictionary.Keys.GetEnumerator(); + } + + /// + /// Removes the first occurrence of a specific object from the . + /// + /// + /// true if was successfully removed from the ; otherwise, false. This method also returns false if is not found in the original . + /// + /// The object to remove from the .The is read-only. + public bool Remove(T item) + { + return TryRemove(item); + } + + /// + /// Gets the number of elements in the set. + /// + public int Count => _dictionary.Count; + + /// + /// Adds an element to the current set and returns a value to indicate if the element was successfully added. + /// + /// + /// true if the element is added to the set; false if the element is already in the set. + /// + /// The element to add to the set. + public bool Add(T item) + { + return TryAdd(item); + } + + public void Clear() + { + _dictionary.Clear(); + } + + public bool Contains(T item) + { + return _dictionary.ContainsKey(item); + } + + private bool TryAdd(T item) + { + return _dictionary.TryAdd(item, default); + } + + private bool TryRemove(T item) + { + return _dictionary.TryRemove(item, out _); + } + } + +} \ No newline at end of file diff --git a/environment/datagen_connector.json b/environment/datagen_connector.json new file mode 100644 index 00000000..66abd216 --- /dev/null +++ b/environment/datagen_connector.json @@ -0,0 +1,14 @@ +{ + "name": "datagen-users", + "config": { + "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", + "kafka.topic": "users", + "quickstart": "users", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "max.interval": 1000, + "iterations": 10000000, + "tasks.max": "1" + } +} \ No newline at end of file diff --git a/environment/docker-compose-with-connect.yml b/environment/docker-compose-with-connect.yml index c65dad8d..3a4489ca 100644 --- a/environment/docker-compose-with-connect.yml +++ b/environment/docker-compose-with-connect.yml @@ -2,7 +2,7 @@ version: '2' services: zookeeper: - image: confluentinc/cp-zookeeper:7.4.0 + image: confluentinc/cp-zookeeper:7.6.1 hostname: zookeeper container_name: zookeeper ports: @@ -13,7 +13,7 @@ services: KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*" broker: - image: confluentinc/cp-server:7.4.0 + image: confluentinc/cp-server:7.6.1 hostname: broker container_name: broker depends_on: @@ -42,7 +42,7 @@ services: CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' schema-registry: - image: confluentinc/cp-schema-registry:7.4.0 + image: confluentinc/cp-schema-registry:7.6.1 hostname: schema-registry container_name: schema-registry depends_on: @@ -55,7 +55,7 @@ services: SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: - image: cnfldemos/kafka-connect-datagen:0.6.0-7.3.0 + image: cnfldemos/kafka-connect-datagen:0.6.4-7.6.0 container_name: connect depends_on: - broker diff --git a/samples/sample-stream/Program.cs b/samples/sample-stream/Program.cs index 3238a83a..b1af4a78 100644 --- a/samples/sample-stream/Program.cs +++ b/samples/sample-stream/Program.cs @@ -14,7 +14,7 @@ namespace sample_stream { public static class Program { - public static async Task Main(string[] args) + public static async Task Main2(string[] args) { var config = new StreamConfig{ ApplicationId = $"test-app", diff --git a/samples/sample-stream/Reproducer314.cs b/samples/sample-stream/Reproducer314.cs new file mode 100644 index 00000000..44230759 --- /dev/null +++ b/samples/sample-stream/Reproducer314.cs @@ -0,0 +1,75 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Streamiz.Kafka.Net; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.Stream; +using Streamiz.Kafka.Net.Table; + +namespace sample_stream; + +public class Reproducer314 +{ + public static async Task Main(string[] args) + { + Console.WriteLine("Hello Streams"); + + var config = new StreamConfig + { + ApplicationId = $"test-windowedtable-bis", + BootstrapServers = "localhost:9092", + AutoOffsetReset = AutoOffsetReset.Earliest + }; + + var builder = CreateWindowedStore(); + var t = builder.Build(); + var windowedTableStream = new KafkaStream(t, config); + + await windowedTableStream.StartAsync(); + + // wait for the store to be restored and ready + Thread.Sleep(10000); + + GetValueFromWindowedStore(windowedTableStream, DateTime.UtcNow.AddHours(-1), new CancellationToken()); + + Console.WriteLine("Finished"); + } + + private static void GetValueFromWindowedStore(KafkaStream windowedTableStream, DateTime startUtcForWindowLookup, CancellationToken cancellationToken) + { + var windowedStore = windowedTableStream.Store(StoreQueryParameters.FromNameAndType("store", QueryableStoreTypes.WindowStore())); + + while (!cancellationToken.IsCancellationRequested) + { + var records = windowedStore.FetchAll(startUtcForWindowLookup, DateTime.UtcNow).ToList(); + + if (records.Count > 0) + { + foreach (var item in records) + { + Console.WriteLine($"Value from windowed store : KEY = {item.Key} VALUE = {item.Value}"); + } + + startUtcForWindowLookup = DateTime.UtcNow; + } + } + } + + private static StreamBuilder CreateWindowedStore() + { + var builder = new StreamBuilder(); + + builder + .Stream("users") + .GroupByKey() + .WindowedBy(TumblingWindowOptions.Of(60000)) + .Aggregate( + () => 0, + (k, v, agg) => Math.Max(v.Length, agg), + InMemoryWindows.As("store").WithValueSerdes()); + + return builder; + } +} \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs new file mode 100644 index 00000000..d76168a0 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamiz.Kafka.Net.State.InMemory.Internal; + +namespace Streamiz.Kafka.Net.Tests.Private; + +public class ConcurrentSetTests +{ + private ConcurrentSet concurrentSet; + + [SetUp] + public void Init() + { + concurrentSet = new(); + } + + [TearDown] + public void Dispose() + { + concurrentSet.Clear(); + } + + [TestCase(1000)] + public void ConcurrencyAdded(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Add(Guid.NewGuid().ToString()); + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + + [TestCase(1000)] + public void ConcurrencyRemoved(int numberTasks) + { + for (int i = 0; i < numberTasks; i++) + concurrentSet.Add(i.ToString()); + + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Remove(obj.ToString()); + }, i)); + } + + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(0, concurrentSet.Count); + } + + [TestCase(10000)] + public void ConcurrencyAddedAndForeach(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + concurrentSet.Add(Guid.NewGuid().ToString()); + foreach (var c in concurrentSet) + ; + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + + [TestCase(10000)] + public void ConcurrencyAddedAndContains(int numberTasks) + { + var taskList = new List(); + for (int i = 0; i < numberTasks; i++) + { + taskList.Add(Task.Factory.StartNew((Object obj) => + { + var guid = Guid.NewGuid().ToString(); + concurrentSet.Add(guid); + Assert.IsTrue(concurrentSet.Contains(guid)); + }, null)); + } + Task.WaitAll(taskList.ToArray()); + Assert.AreEqual(numberTasks, concurrentSet.Count); + } + +} \ No newline at end of file