Skip to content

Commit

Permalink
Merge pull request #384 from LGouellec/fix/concurrent-issue-window
Browse files Browse the repository at this point in the history
#382 - Fix InMemoryWindowStore concurrent issue
  • Loading branch information
LGouellec authored Nov 9, 2024
2 parents 509076f + 7882a92 commit 1ecdfaa
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 49 deletions.
8 changes: 7 additions & 1 deletion core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.IO;
using System.Threading;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.Stream.Internal;

namespace Streamiz.Kafka.Net.Mock
Expand Down Expand Up @@ -169,17 +170,22 @@ public void Dispose()

public IStateStore GetStateStore<K, V>(string name)
{
bool windowStore = false;
IList<IStateStore> stores = new List<IStateStore>();
foreach (var task in threadTopology.ActiveTasks)
{
var store = task.GetStore(name);
if (store != null)
{
if (store is IReadOnlyWindowStore<K, V> || store is ITimestampedWindowStore<K, V>)
windowStore = true;
stores.Add(store);
}
}

return stores.Count > 0 ? new MockReadOnlyKeyValueStore<K, V>(stores) : null;
return stores.Count > 0 ?
!windowStore ? new MockReadOnlyKeyValueStore<K, V>(stores) : new MockReadOnlyWindowStore<K, V>(stores)
: null;
}

public void StartDriver()
Expand Down
105 changes: 105 additions & 0 deletions core/Mock/Kafka/MockReadOnlyWindowStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Streamiz.Kafka.Net.Crosscutting;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.State.Enumerator;
using Streamiz.Kafka.Net.State.Internal;

namespace Streamiz.Kafka.Net.Mock.Kafka
{
internal class MockReadOnlyWindowStore<K, V> : IStateStore, IReadOnlyWindowStore<K, V>
{
private readonly IEnumerable<IStateStore> stores;

public string Name => "MockWindowStore";
public bool Persistent => false;
public bool IsLocally => true;

public bool IsOpen => true;

public MockReadOnlyWindowStore(IEnumerable<IStateStore> stores)
{
this.stores = stores;
}

private IEnumerable<IReadOnlyWindowStore<K, V>> GetAllStores()
{
var readonlystores = stores
.OfType<IReadOnlyWindowStore<K, V>>()
.ToList();

var timestamp = stores
.OfType<ITimestampedWindowStore<K, V>>()
.Select(s => new ReadOnlyWindowStoreFacade<K, V>(s));

readonlystores.AddRange(timestamp);
return readonlystores;
}

public void Init(ProcessorContext context, IStateStore root)
{
context.Register(root, null);
}

public void Flush()
{
// NOTHING
}

public void Close()
{
// NOTHING
}

public V Fetch(K key, long time)
{
foreach (var store in GetAllStores())
{
var result = store.Fetch(key, time);
if (result != null)
{
return result;
}
}

return default;
}

public IWindowStoreEnumerator<V> Fetch(K key, DateTime from, DateTime to)
=> Fetch(key, from.GetMilliseconds(), to.GetMilliseconds());

public IWindowStoreEnumerator<V> Fetch(K key, long from, long to)
{
foreach (var store in GetAllStores())
{
var it = store.Fetch(key, from, to);
if (!it.MoveNext())
{
it.Dispose();
}
else
{
it.Reset();
return it;
}
}
return new EmptyWindowStoreEnumerator<V>();
}

public IKeyValueEnumerator<Windowed<K>, V> All()
{
return new CompositeKeyValueEnumerator<Windowed<K>, V, IReadOnlyWindowStore<K, V>>(
GetAllStores(),
(store) => store.All());
}

public IKeyValueEnumerator<Windowed<K>, V> FetchAll(DateTime from, DateTime to)
{
return new CompositeKeyValueEnumerator<Windowed<K>, V, IReadOnlyWindowStore<K, V>>(
GetAllStores(),
(store) => store.FetchAll(from, to));
}
}
}
5 changes: 2 additions & 3 deletions core/Mock/TopologyTestDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,9 @@ public IReadOnlyWindowStore<K, V> GetWindowStore<K, V>(string name)
var store = behavior.GetStateStore<K, V>(name);
if (store is ITimestampedWindowStore<K, V>)
return new ReadOnlyWindowStoreFacade<K, V>(store as ITimestampedWindowStore<K, V>);
else if (store is IReadOnlyWindowStore<K, V>)
if (store is IReadOnlyWindowStore<K, V>)
return (IReadOnlyWindowStore<K, V>)store;
else
return null;
return null;
}

#endregion
Expand Down
5 changes: 3 additions & 2 deletions core/State/InMemory/InMemoryKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@
using Streamiz.Kafka.Net.State.InMemory.Internal;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Microsoft.Extensions.Logging;

namespace Streamiz.Kafka.Net.State.InMemory
{
/// <summary>
/// <see cref="InMemoryKeyValueStore"/> implements <see cref="IKeyValueStore{K, V}"/>.
/// This store can be used for development phase. It's not persistent, so be carefull.
/// This store can be used for development phase. It's not persistent, so be careful.
/// </summary>
public class InMemoryKeyValueStore : IKeyValueStore<Bytes, byte[]>
{
private static readonly ILogger log = Logger.GetLogger(typeof(InMemoryKeyValueStore));
private BytesComparer bytesComparer;
private int size = 0;
private readonly ConcurrentDictionary<Bytes, byte[]> map;

/// <summary>
/// Constructor with the store name
/// </summary>
Expand Down
114 changes: 80 additions & 34 deletions core/State/InMemory/InMemoryWindowStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Internal;
using Streamiz.Kafka.Net.State.Helper;
using Streamiz.Kafka.Net.State.InMemory.Internal;
using Streamiz.Kafka.Net.State.Internal;

namespace Streamiz.Kafka.Net.State.InMemory
Expand Down Expand Up @@ -262,8 +261,11 @@ public Windowed<Bytes> PeekNextKey()

#endregion

internal class
InMemoryWindowStore : IWindowStore<Bytes, byte[]>
/// <summary>
/// <see cref="InMemoryWindowStore"/> implements <see cref="IWindowStore{K, V}"/>.
/// This store can be used for development phase. It's not persistent, so be careful.
/// </summary>
public class InMemoryWindowStore : IWindowStore<Bytes, byte[]>
{
private readonly TimeSpan retention;
private readonly long size;
Expand All @@ -274,6 +276,8 @@ internal class
private long observedStreamTime = -1;
private int seqnum = 0;

private Mutex _mutex = new();

private readonly ConcurrentDictionary<long, ConcurrentDictionary<Bytes, byte[]>> map = new();
private readonly ConcurrentSet<InMemoryWindowStoreEnumeratorWrapper> openIterators = new();

Expand Down Expand Up @@ -302,19 +306,36 @@ private void UpdateSeqNumber()

public virtual IKeyValueEnumerator<Windowed<Bytes>, byte[]> All()
{
RemoveExpiredData();
long minTime = observedStreamTime - (long)retention.TotalMilliseconds;
return CreateNewWindowedKeyValueEnumerator(null, null, Tail(minTime));
try
{
_mutex.WaitOne();
RemoveExpiredData();
long minTime = observedStreamTime - (long)retention.TotalMilliseconds;
return CreateNewWindowedKeyValueEnumerator(null, null, Tail(minTime));
}
finally
{
_mutex.ReleaseMutex();
}
}

public void Close()
{
if (openIterators.Count != 0)
try
{
_mutex.WaitOne();
if (openIterators.Count != 0)
{
logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}",
openIterators.Count, Name);
foreach (var iterator in openIterators)
iterator.Close();
openIterators.Clear();
}
}
finally
{
logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}", openIterators.Count, Name);
foreach(var iterator in openIterators)
iterator.Close();
openIterators.Clear();
_mutex.ReleaseMutex();
}

map.Clear();
Expand All @@ -323,52 +344,77 @@ public void Close()

public virtual byte[] Fetch(Bytes key, long time)
{
RemoveExpiredData();
try
{
_mutex.WaitOne();
RemoveExpiredData();

if (time <= observedStreamTime - retention.TotalMilliseconds)
return null;
if (time <= observedStreamTime - retention.TotalMilliseconds)
return null;

if (map.ContainsKey(time))
if (map.ContainsKey(time))
{
var keyFrom = retainDuplicates ? WrapWithSeq(key, 0) : key;
var keyTo = retainDuplicates ? WrapWithSeq(key, Int32.MaxValue) : key;
return map[time]
.FirstOrDefault(kv => kv.Key.CompareTo(keyFrom) >= 0 && kv.Key.CompareTo(keyTo) <= 0).Value;
}
else
return null;
}
finally
{
var keyFrom = retainDuplicates ? WrapWithSeq(key, 0) : key;
var keyTo = retainDuplicates ? WrapWithSeq(key, Int32.MaxValue) : key;
return map[time]
.FirstOrDefault(kv => kv.Key.CompareTo(keyFrom) >= 0 && kv.Key.CompareTo(keyTo) <= 0).Value;
_mutex.ReleaseMutex();
}
else
return null;
}

public virtual IWindowStoreEnumerator<byte[]> Fetch(Bytes key, DateTime from, DateTime to)
=> Fetch(key, from.GetMilliseconds(), to.GetMilliseconds());

public virtual IWindowStoreEnumerator<byte[]> Fetch(Bytes key, long from, long to)
{
RemoveExpiredData();
try
{
_mutex.WaitOne();
RemoveExpiredData();

long minTime = Math.Max(from, observedStreamTime - (long)retention.TotalMilliseconds + 1);
long minTime = Math.Max(from, observedStreamTime - (long)retention.TotalMilliseconds + 1);


if (to < minTime)
if (to < minTime)
{
return new EmptyWindowStoreEnumerator<byte[]>();
}

return CreateNewWindowStoreEnumerator(key, SubMap(minTime, to));
}
finally
{
return new EmptyWindowStoreEnumerator<byte[]>();
_mutex.ReleaseMutex();
}

return CreateNewWindowStoreEnumerator(key, SubMap(minTime, to));
}

public virtual IKeyValueEnumerator<Windowed<Bytes>, byte[]> FetchAll(DateTime from, DateTime to)
{
RemoveExpiredData();
try
{
_mutex.WaitOne();
RemoveExpiredData();

long minTime = Math.Max(from.GetMilliseconds(), observedStreamTime - (long)retention.TotalMilliseconds + 1);
long minTime = Math.Max(from.GetMilliseconds(),
observedStreamTime - (long)retention.TotalMilliseconds + 1);

if (to.GetMilliseconds() < minTime)
{
return new EmptyKeyValueEnumerator<Windowed<Bytes>, byte[]>();
}
if (to.GetMilliseconds() < minTime)
{
return new EmptyKeyValueEnumerator<Windowed<Bytes>, byte[]>();
}

return CreateNewWindowedKeyValueEnumerator(null, null, SubMap(minTime, to.GetMilliseconds()));
return CreateNewWindowedKeyValueEnumerator(null, null, SubMap(minTime, to.GetMilliseconds()));
}
finally
{
_mutex.ReleaseMutex();
}
}

public virtual void Flush()
Expand Down
2 changes: 1 addition & 1 deletion kafka-stream-net.sln
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Streamiz.Kafka.Net.SchemaRe
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf", "serdes\Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf\Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf.csproj", "{21CD1F56-F1AF-42BD-877B-43AC36A43793}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Crosscutting", "Crosscutting", "{A34792BA-25DD-44A5-B420-9CC810379BFE}"
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Metrics", "Metrics", "{A34792BA-25DD-44A5-B420-9CC810379BFE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Streamiz.Kafka.Net.Metrics.Prometheus", "metrics\Streamiz.Kafka.Net.Metrics.Prometheus\Streamiz.Kafka.Net.Metrics.Prometheus.csproj", "{29CB687B-951E-4B3E-AD1C-13B222490091}"
EndProject
Expand Down
Loading

0 comments on commit 1ecdfaa

Please sign in to comment.