Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Issue/314 #345

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions core/State/InMemory/InMemoryWindowStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Internal;
using Streamiz.Kafka.Net.State.Helper;
using Streamiz.Kafka.Net.State.InMemory.Internal;

namespace Streamiz.Kafka.Net.State.InMemory
{
Expand Down Expand Up @@ -260,7 +261,8 @@ public Windowed<Bytes> PeekNextKey()

#endregion

internal class InMemoryWindowStore : IWindowStore<Bytes, byte[]>
internal class
InMemoryWindowStore : IWindowStore<Bytes, byte[]>
{
private readonly TimeSpan retention;
private readonly long size;
Expand All @@ -272,8 +274,7 @@ internal class InMemoryWindowStore : IWindowStore<Bytes, byte[]>
private int seqnum = 0;

private readonly ConcurrentDictionary<long, ConcurrentDictionary<Bytes, byte[]>> map = new();

private readonly ISet<InMemoryWindowStoreEnumeratorWrapper> openIterators = new HashSet<InMemoryWindowStoreEnumeratorWrapper>();
private readonly ConcurrentSet<InMemoryWindowStoreEnumeratorWrapper> openIterators = new();

private readonly ILogger logger = Logger.GetLogger(typeof(InMemoryWindowStore));

Expand Down Expand Up @@ -309,8 +310,9 @@ public void Close()
if (openIterators.Count != 0)
{
logger.LogWarning("Closing {OpenIteratorCount} open iterators for store {Name}", openIterators.Count, Name);
for (int i = 0; i< openIterators.Count; ++i)
openIterators.ElementAt(i).Close();
foreach(var iterator in openIterators)
iterator.Close();
openIterators.Clear();
}

map.Clear();
Expand Down
71 changes: 71 additions & 0 deletions core/State/InMemory/Internal/ConcurrentHashSet.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Streamiz.Kafka.Net.State.InMemory.Internal
{
internal class ConcurrentSet<T>
{
private readonly ConcurrentDictionary<T, byte> _dictionary = new();

/// <summary>
/// Returns an enumerator that iterates through the collection.
/// </summary>
/// <returns>
/// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
/// </returns>
public IEnumerator<T> GetEnumerator()
{
return _dictionary.Keys.GetEnumerator();
}

/// <summary>
/// Removes the first occurrence of a specific object from the <see cref="T:System.Collections.Generic.ICollection`1"/>.
/// </summary>
/// <returns>
/// true if <paramref name="item"/> was successfully removed from the <see cref="T:System.Collections.Generic.ICollection`1"/>; otherwise, false. This method also returns false if <paramref name="item"/> is not found in the original <see cref="T:System.Collections.Generic.ICollection`1"/>.
/// </returns>
/// <param name="item">The object to remove from the <see cref="T:System.Collections.Generic.ICollection`1"/>.</param><exception cref="T:System.NotSupportedException">The <see cref="T:System.Collections.Generic.ICollection`1"/> is read-only.</exception>
public bool Remove(T item)
{
return TryRemove(item);
}

/// <summary>
/// Gets the number of elements in the set.
/// </summary>
public int Count => _dictionary.Count;

/// <summary>
/// Adds an element to the current set and returns a value to indicate if the element was successfully added.
/// </summary>
/// <returns>
/// true if the element is added to the set; false if the element is already in the set.
/// </returns>
/// <param name="item">The element to add to the set.</param>
public bool Add(T item)
{
return TryAdd(item);
}

public void Clear()
{
_dictionary.Clear();
}

public bool Contains(T item)
{
return _dictionary.ContainsKey(item);
}

private bool TryAdd(T item)
{
return _dictionary.TryAdd(item, default);
}

private bool TryRemove(T item)
{
return _dictionary.TryRemove(item, out _);
}
}

}
14 changes: 14 additions & 0 deletions environment/datagen_connector.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "datagen-users",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}
8 changes: 4 additions & 4 deletions environment/docker-compose-with-connect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
image: confluentinc/cp-zookeeper:7.6.1
hostname: zookeeper
container_name: zookeeper
ports:
Expand All @@ -13,7 +13,7 @@ services:
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"

broker:
image: confluentinc/cp-server:7.4.0
image: confluentinc/cp-server:7.6.1
hostname: broker
container_name: broker
depends_on:
Expand Down Expand Up @@ -42,7 +42,7 @@ services:
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
image: confluentinc/cp-schema-registry:7.6.1
hostname: schema-registry
container_name: schema-registry
depends_on:
Expand All @@ -55,7 +55,7 @@ services:
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

connect:
image: cnfldemos/kafka-connect-datagen:0.6.0-7.3.0
image: cnfldemos/kafka-connect-datagen:0.6.4-7.6.0
container_name: connect
depends_on:
- broker
Expand Down
2 changes: 1 addition & 1 deletion samples/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace sample_stream
{
public static class Program
{
public static async Task Main(string[] args)
public static async Task Main2(string[] args)
{
var config = new StreamConfig<StringSerDes, StringSerDes>{
ApplicationId = $"test-app",
Expand Down
75 changes: 75 additions & 0 deletions samples/sample-stream/Reproducer314.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.Stream;
using Streamiz.Kafka.Net.Table;

namespace sample_stream;

public class Reproducer314
{
public static async Task Main(string[] args)
{
Console.WriteLine("Hello Streams");

var config = new StreamConfig<StringSerDes, StringSerDes>
{
ApplicationId = $"test-windowedtable-bis",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};

var builder = CreateWindowedStore();
var t = builder.Build();
var windowedTableStream = new KafkaStream(t, config);

await windowedTableStream.StartAsync();

// wait for the store to be restored and ready
Thread.Sleep(10000);

GetValueFromWindowedStore(windowedTableStream, DateTime.UtcNow.AddHours(-1), new CancellationToken());

Console.WriteLine("Finished");
}

private static void GetValueFromWindowedStore(KafkaStream windowedTableStream, DateTime startUtcForWindowLookup, CancellationToken cancellationToken)
{
var windowedStore = windowedTableStream.Store(StoreQueryParameters.FromNameAndType("store", QueryableStoreTypes.WindowStore<string, int>()));

while (!cancellationToken.IsCancellationRequested)
{
var records = windowedStore.FetchAll(startUtcForWindowLookup, DateTime.UtcNow).ToList();

if (records.Count > 0)
{
foreach (var item in records)
{
Console.WriteLine($"Value from windowed store : KEY = {item.Key} VALUE = {item.Value}");
}

startUtcForWindowLookup = DateTime.UtcNow;
}
}
}

private static StreamBuilder CreateWindowedStore()
{
var builder = new StreamBuilder();

builder
.Stream<string, string>("users")
.GroupByKey()
.WindowedBy(TumblingWindowOptions.Of(60000))
.Aggregate(
() => 0,
(k, v, agg) => Math.Max(v.Length, agg),
InMemoryWindows.As<string, int>("store").WithValueSerdes<Int32SerDes>());

return builder;
}
}
93 changes: 93 additions & 0 deletions test/Streamiz.Kafka.Net.Tests/Private/ConcurrentSetTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamiz.Kafka.Net.State.InMemory.Internal;

namespace Streamiz.Kafka.Net.Tests.Private;

public class ConcurrentSetTests
{
private ConcurrentSet<string> concurrentSet;

[SetUp]
public void Init()
{
concurrentSet = new();
}

[TearDown]
public void Dispose()
{
concurrentSet.Clear();
}

[TestCase(1000)]
public void ConcurrencyAdded(int numberTasks)
{
var taskList = new List<Task>();
for (int i = 0; i < numberTasks; i++)
{
taskList.Add(Task.Factory.StartNew((Object obj) =>
{
concurrentSet.Add(Guid.NewGuid().ToString());
}, null));
}
Task.WaitAll(taskList.ToArray());
Assert.AreEqual(numberTasks, concurrentSet.Count);
}

[TestCase(1000)]
public void ConcurrencyRemoved(int numberTasks)
{
for (int i = 0; i < numberTasks; i++)
concurrentSet.Add(i.ToString());

var taskList = new List<Task>();
for (int i = 0; i < numberTasks; i++)
{
taskList.Add(Task.Factory.StartNew((Object obj) =>
{
concurrentSet.Remove(obj.ToString());
}, i));
}

Task.WaitAll(taskList.ToArray());
Assert.AreEqual(0, concurrentSet.Count);
}

[TestCase(10000)]
public void ConcurrencyAddedAndForeach(int numberTasks)
{
var taskList = new List<Task>();
for (int i = 0; i < numberTasks; i++)
{
taskList.Add(Task.Factory.StartNew((Object obj) =>
{
concurrentSet.Add(Guid.NewGuid().ToString());
foreach (var c in concurrentSet)
;
}, null));
}
Task.WaitAll(taskList.ToArray());
Assert.AreEqual(numberTasks, concurrentSet.Count);
}

[TestCase(10000)]
public void ConcurrencyAddedAndContains(int numberTasks)
{
var taskList = new List<Task>();
for (int i = 0; i < numberTasks; i++)
{
taskList.Add(Task.Factory.StartNew((Object obj) =>
{
var guid = Guid.NewGuid().ToString();
concurrentSet.Add(guid);
Assert.IsTrue(concurrentSet.Contains(guid));
}, null));
}
Task.WaitAll(taskList.ToArray());
Assert.AreEqual(numberTasks, concurrentSet.Count);
}

}
Loading