diff --git a/core/Mock/ClusterInMemoryTopologyDriver.cs b/core/Mock/ClusterInMemoryTopologyDriver.cs index ce63c794..ea49190e 100644 --- a/core/Mock/ClusterInMemoryTopologyDriver.cs +++ b/core/Mock/ClusterInMemoryTopologyDriver.cs @@ -10,6 +10,7 @@ using System.IO; using System.Threading; using Streamiz.Kafka.Net.Metrics; +using Streamiz.Kafka.Net.State; using Streamiz.Kafka.Net.Stream.Internal; namespace Streamiz.Kafka.Net.Mock @@ -169,17 +170,22 @@ public void Dispose() public IStateStore GetStateStore(string name) { + bool windowStore = false; IList stores = new List(); foreach (var task in threadTopology.ActiveTasks) { var store = task.GetStore(name); if (store != null) { + if (store is IReadOnlyWindowStore || store is ITimestampedWindowStore) + windowStore = true; stores.Add(store); } } - return stores.Count > 0 ? new MockReadOnlyKeyValueStore(stores) : null; + return stores.Count > 0 ? + !windowStore ? new MockReadOnlyKeyValueStore(stores) : new MockReadOnlyWindowStore(stores) + : null; } public void StartDriver() diff --git a/core/Mock/Kafka/MockReadOnlyWindowStore.cs b/core/Mock/Kafka/MockReadOnlyWindowStore.cs new file mode 100644 index 00000000..3135ad29 --- /dev/null +++ b/core/Mock/Kafka/MockReadOnlyWindowStore.cs @@ -0,0 +1,105 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Streamiz.Kafka.Net.Crosscutting; +using Streamiz.Kafka.Net.Processors; +using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.State.Enumerator; +using Streamiz.Kafka.Net.State.Internal; + +namespace Streamiz.Kafka.Net.Mock.Kafka +{ + internal class MockReadOnlyWindowStore : IStateStore, IReadOnlyWindowStore + { + private readonly IEnumerable stores; + + public string Name => "MockWindowStore"; + public bool Persistent => false; + public bool IsLocally => true; + + public bool IsOpen => true; + + public MockReadOnlyWindowStore(IEnumerable stores) + { + this.stores = stores; + } + + private IEnumerable> GetAllStores() + { + var readonlystores = stores + .OfType>() + .ToList(); + + var timestamp = stores + .OfType>() + .Select(s => new ReadOnlyWindowStoreFacade(s)); + + readonlystores.AddRange(timestamp); + return readonlystores; + } + + public void Init(ProcessorContext context, IStateStore root) + { + context.Register(root, null); + } + + public void Flush() + { + // NOTHING + } + + public void Close() + { + // NOTHING + } + + public V Fetch(K key, long time) + { + foreach (var store in GetAllStores()) + { + var result = store.Fetch(key, time); + if (result != null) + { + return result; + } + } + + return default; + } + + public IWindowStoreEnumerator Fetch(K key, DateTime from, DateTime to) + => Fetch(key, from.GetMilliseconds(), to.GetMilliseconds()); + + public IWindowStoreEnumerator Fetch(K key, long from, long to) + { + foreach (var store in GetAllStores()) + { + var it = store.Fetch(key, from, to); + if (!it.MoveNext()) + { + it.Dispose(); + } + else + { + it.Reset(); + return it; + } + } + return new EmptyWindowStoreEnumerator(); + } + + public IKeyValueEnumerator, V> All() + { + return new CompositeKeyValueEnumerator, V, IReadOnlyWindowStore>( + GetAllStores(), + (store) => store.All()); + } + + public IKeyValueEnumerator, V> FetchAll(DateTime from, DateTime to) + { + return new CompositeKeyValueEnumerator, V, IReadOnlyWindowStore>( + GetAllStores(), + (store) => store.FetchAll(from, to)); + } + } +} \ No newline at end of file diff --git a/core/Mock/TopologyTestDriver.cs b/core/Mock/TopologyTestDriver.cs index edaf2b4e..f79d9573 100644 --- a/core/Mock/TopologyTestDriver.cs +++ b/core/Mock/TopologyTestDriver.cs @@ -380,10 +380,9 @@ public IReadOnlyWindowStore GetWindowStore(string name) var store = behavior.GetStateStore(name); if (store is ITimestampedWindowStore) return new ReadOnlyWindowStoreFacade(store as ITimestampedWindowStore); - else if (store is IReadOnlyWindowStore) + if (store is IReadOnlyWindowStore) return (IReadOnlyWindowStore)store; - else - return null; + return null; } #endregion diff --git a/core/State/InMemory/InMemoryKeyValueStore.cs b/core/State/InMemory/InMemoryKeyValueStore.cs index a6c4eae0..43256a05 100644 --- a/core/State/InMemory/InMemoryKeyValueStore.cs +++ b/core/State/InMemory/InMemoryKeyValueStore.cs @@ -6,13 +6,14 @@ using Streamiz.Kafka.Net.State.InMemory.Internal; using System.Collections.Generic; using System.Linq; +using System.Threading; using Microsoft.Extensions.Logging; namespace Streamiz.Kafka.Net.State.InMemory { /// /// implements . - /// This store can be used for development phase. It's not persistent, so be carefull. + /// This store can be used for development phase. It's not persistent, so be careful. /// public class InMemoryKeyValueStore : IKeyValueStore { @@ -20,7 +21,7 @@ public class InMemoryKeyValueStore : IKeyValueStore private BytesComparer bytesComparer; private int size = 0; private readonly ConcurrentDictionary map; - + /// /// Constructor with the store name /// diff --git a/core/State/InMemory/InMemoryWindowStore.cs b/core/State/InMemory/InMemoryWindowStore.cs index 6b37e18e..56ec1697 100644 --- a/core/State/InMemory/InMemoryWindowStore.cs +++ b/core/State/InMemory/InMemoryWindowStore.cs @@ -12,7 +12,6 @@ using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Internal; using Streamiz.Kafka.Net.State.Helper; -using Streamiz.Kafka.Net.State.InMemory.Internal; using Streamiz.Kafka.Net.State.Internal; namespace Streamiz.Kafka.Net.State.InMemory @@ -262,8 +261,11 @@ public Windowed PeekNextKey() #endregion - internal class - InMemoryWindowStore : IWindowStore + /// + /// implements . + /// This store can be used for development phase. It's not persistent, so be careful. + /// + public class InMemoryWindowStore : IWindowStore { private readonly TimeSpan retention; private readonly long size; @@ -274,6 +276,8 @@ internal class private long observedStreamTime = -1; private int seqnum = 0; + private Mutex _mutex = new(); + private readonly ConcurrentDictionary> map = new(); private readonly ConcurrentSet openIterators = new(); @@ -302,19 +306,36 @@ private void UpdateSeqNumber() public virtual IKeyValueEnumerator, byte[]> All() { - RemoveExpiredData(); - long minTime = observedStreamTime - (long)retention.TotalMilliseconds; - return CreateNewWindowedKeyValueEnumerator(null, null, Tail(minTime)); + try + { + _mutex.WaitOne(); + RemoveExpiredData(); + long minTime = observedStreamTime - (long)retention.TotalMilliseconds; + return CreateNewWindowedKeyValueEnumerator(null, null, Tail(minTime)); + } + finally + { + _mutex.ReleaseMutex(); + } } public void Close() { - if (openIterators.Count != 0) + try + { + _mutex.WaitOne(); + if (openIterators.Count != 0) + { + logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}", + openIterators.Count, Name); + foreach (var iterator in openIterators) + iterator.Close(); + openIterators.Clear(); + } + } + finally { - logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}", openIterators.Count, Name); - foreach(var iterator in openIterators) - iterator.Close(); - openIterators.Clear(); + _mutex.ReleaseMutex(); } map.Clear(); @@ -323,20 +344,28 @@ public void Close() public virtual byte[] Fetch(Bytes key, long time) { - RemoveExpiredData(); + try + { + _mutex.WaitOne(); + RemoveExpiredData(); - if (time <= observedStreamTime - retention.TotalMilliseconds) - return null; + if (time <= observedStreamTime - retention.TotalMilliseconds) + return null; - if (map.ContainsKey(time)) + if (map.ContainsKey(time)) + { + var keyFrom = retainDuplicates ? WrapWithSeq(key, 0) : key; + var keyTo = retainDuplicates ? WrapWithSeq(key, Int32.MaxValue) : key; + return map[time] + .FirstOrDefault(kv => kv.Key.CompareTo(keyFrom) >= 0 && kv.Key.CompareTo(keyTo) <= 0).Value; + } + else + return null; + } + finally { - var keyFrom = retainDuplicates ? WrapWithSeq(key, 0) : key; - var keyTo = retainDuplicates ? WrapWithSeq(key, Int32.MaxValue) : key; - return map[time] - .FirstOrDefault(kv => kv.Key.CompareTo(keyFrom) >= 0 && kv.Key.CompareTo(keyTo) <= 0).Value; + _mutex.ReleaseMutex(); } - else - return null; } public virtual IWindowStoreEnumerator Fetch(Bytes key, DateTime from, DateTime to) @@ -344,31 +373,48 @@ public virtual IWindowStoreEnumerator Fetch(Bytes key, DateTime from, Da public virtual IWindowStoreEnumerator Fetch(Bytes key, long from, long to) { - RemoveExpiredData(); + try + { + _mutex.WaitOne(); + RemoveExpiredData(); - long minTime = Math.Max(from, observedStreamTime - (long)retention.TotalMilliseconds + 1); + long minTime = Math.Max(from, observedStreamTime - (long)retention.TotalMilliseconds + 1); - if (to < minTime) + if (to < minTime) + { + return new EmptyWindowStoreEnumerator(); + } + + return CreateNewWindowStoreEnumerator(key, SubMap(minTime, to)); + } + finally { - return new EmptyWindowStoreEnumerator(); + _mutex.ReleaseMutex(); } - - return CreateNewWindowStoreEnumerator(key, SubMap(minTime, to)); } public virtual IKeyValueEnumerator, byte[]> FetchAll(DateTime from, DateTime to) { - RemoveExpiredData(); + try + { + _mutex.WaitOne(); + RemoveExpiredData(); - long minTime = Math.Max(from.GetMilliseconds(), observedStreamTime - (long)retention.TotalMilliseconds + 1); + long minTime = Math.Max(from.GetMilliseconds(), + observedStreamTime - (long)retention.TotalMilliseconds + 1); - if (to.GetMilliseconds() < minTime) - { - return new EmptyKeyValueEnumerator, byte[]>(); - } + if (to.GetMilliseconds() < minTime) + { + return new EmptyKeyValueEnumerator, byte[]>(); + } - return CreateNewWindowedKeyValueEnumerator(null, null, SubMap(minTime, to.GetMilliseconds())); + return CreateNewWindowedKeyValueEnumerator(null, null, SubMap(minTime, to.GetMilliseconds())); + } + finally + { + _mutex.ReleaseMutex(); + } } public virtual void Flush() diff --git a/kafka-stream-net.sln b/kafka-stream-net.sln index e776a6aa..5876f08c 100644 --- a/kafka-stream-net.sln +++ b/kafka-stream-net.sln @@ -21,7 +21,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Streamiz.Kafka.Net.SchemaRe EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf", "serdes\Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf\Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf.csproj", "{21CD1F56-F1AF-42BD-877B-43AC36A43793}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Crosscutting", "Crosscutting", "{A34792BA-25DD-44A5-B420-9CC810379BFE}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Metrics", "Metrics", "{A34792BA-25DD-44A5-B420-9CC810379BFE}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Streamiz.Kafka.Net.Metrics.Prometheus", "metrics\Streamiz.Kafka.Net.Metrics.Prometheus\Streamiz.Kafka.Net.Metrics.Prometheus.csproj", "{29CB687B-951E-4B3E-AD1C-13B222490091}" EndProject diff --git a/launcher/sample-stream/Program.cs b/launcher/sample-stream/Program.cs index c35f4564..d4c7195d 100644 --- a/launcher/sample-stream/Program.cs +++ b/launcher/sample-stream/Program.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Reflection; +using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Logging; @@ -23,7 +24,7 @@ public static async Task Main(string[] args) Logger = LoggerFactory.Create((b) => { b.AddConsole(); - b.SetMinimumLevel(LogLevel.Debug); + b.SetMinimumLevel(LogLevel.Information); }) }; @@ -35,20 +36,29 @@ public static async Task Main(string[] args) }; await stream.StartAsync(); - } + } private static Topology BuildTopology() { var builder = new StreamBuilder(); + TimeSpan _windowSizeMs = TimeSpan.FromSeconds(5); + + var materializer + = InMemoryWindows + .As("agg-store", _windowSizeMs) + .WithKeySerdes(new StringSerDes()) + .WithValueSerdes(new Int64SerDes()) + .WithRetention(_windowSizeMs); + builder.Stream("input2") .GroupByKey() .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1))) - .Count() - .Suppress(SuppressedBuilder.UntilWindowClose, long>(TimeSpan.Zero, + .Count(materializer) + /*.Suppress(SuppressedBuilder.UntilWindowClose, long>(TimeSpan.Zero, StrictBufferConfig.Unbounded()) .WithKeySerdes(new TimeWindowedSerDes(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds))) - .ToStream() + */.ToStream() .Map((k,v, r) => new KeyValuePair(k.Key, v)) .To("output2"); diff --git a/test/Streamiz.Kafka.Net.Tests/TestDriver/Issue212Tests.cs b/test/Streamiz.Kafka.Net.Tests/TestDriver/Issue212Tests.cs index a3531c05..9984cf46 100644 --- a/test/Streamiz.Kafka.Net.Tests/TestDriver/Issue212Tests.cs +++ b/test/Streamiz.Kafka.Net.Tests/TestDriver/Issue212Tests.cs @@ -11,19 +11,19 @@ public class Issue212Tests { private static readonly JsonSerializerOptions _jsonSerializerOptions = new(); - public class Tag + private class Tag { public string Field1 { get; set; } public string Field2 { get; set; } public string Field3 { get; set; } } - public class Metadata + private class Metadata { public Dictionary Data { get; set; } } - public class TagInfo + private class TagInfo { public Tag Tag { get; set; } public Metadata MetaData { get; set; } diff --git a/test/Streamiz.Kafka.Net.Tests/TestDriver/Issue382Tests.cs b/test/Streamiz.Kafka.Net.Tests/TestDriver/Issue382Tests.cs new file mode 100644 index 00000000..e2e5f772 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/TestDriver/Issue382Tests.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamiz.Kafka.Net.Mock; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.Stream; +using Streamiz.Kafka.Net.Table; + +namespace Streamiz.Kafka.Net.Tests.TestDriver; + +public class Issue382Tests +{ + private class Tag + { + public string Field1 { get; set; } + public string Field2 { get; set; } + public string Field3 { get; set; } + } + + [Test] + public void Reproducer() + { + List Aggregator(string key, Tag value, List aggValues) + { + if(!aggValues.Contains(value)) + aggValues.Add(value); + return aggValues; + } + + var streamConfig = new StreamConfig>(); + streamConfig.ApplicationId = "test-reproducer-issue382"; + + TimeSpan _windowSizeMs = TimeSpan.FromSeconds(5); + + var builder = new StreamBuilder(); + var materializer + = InMemoryWindows + .As>("tags-agg-store", _windowSizeMs) + .WithKeySerdes(new StringSerDes()) + .WithValueSerdes(new JsonSerDes>()) + .WithRetention(_windowSizeMs); + + builder.Stream("tags") + .GroupByKey() + .WindowedBy(TumblingWindowOptions.Of(_windowSizeMs)) + .Aggregate(() => new List(), Aggregator, materializer); + + + var topology = builder.Build(); + + using var driver = new TopologyTestDriver(topology, streamConfig, TopologyTestDriver.Mode.ASYNC_CLUSTER_IN_MEMORY); + var inputTopic = driver.CreateInputTopic>("tags"); + bool @continue = true; + + var task = Task.Factory.StartNew(() => { + while(@continue) + { + try + { + DateTime now = DateTime.Now; + var windowStore = driver.GetWindowStore>("tags-agg-store"); + var items = windowStore?.FetchAll( + now.Subtract(_windowSizeMs), + now.Add(_windowSizeMs)) + .ToList(); + } + catch + { + // nothing + } + } + }); + + var taskProducer = Task.Factory.StartNew(() => + { + DateTime now = DateTime.Now; + var rd = new Random(); + while (now.AddSeconds(20) > DateTime.Now) + { + inputTopic.PipeInput( + $"key{rd.NextInt64(1000)}", + new Tag { + Field1 = $"tag{rd.NextInt64(500)}", + Field2 = $"tag{rd.NextInt64(500)}", + Field3 = $"tag{rd.NextInt64(500)}" + }); + } + }); + + taskProducer.Wait(); + @continue = false; + task?.Wait(); + + Assert.IsTrue(driver.IsRunning); + Assert.IsFalse(driver.IsError); + } +} \ No newline at end of file