Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windowed KTables appear to have a concurrency issue. #382

Closed
8 tasks
AntonyLittle opened this issue Oct 31, 2024 · 9 comments · Fixed by #384
Closed
8 tasks

Windowed KTables appear to have a concurrency issue. #382

AntonyLittle opened this issue Oct 31, 2024 · 9 comments · Fixed by #384
Assignees
Labels
bug Something isn't working
Milestone

Comments

@AntonyLittle
Copy link

Description

I am seeing the following exception when using Windowed KTables in Streamiz 1.6.0:

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[urn.menzies.falcona.interface_tableruntobarcodestable-1424c242-4ed5-48a9-927f-c3e6b8abe430-stream-thread-0] Encountered the following error during processing:
      System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
         at System.Collections.Generic.HashSet`1.Enumerator.MoveNext()
         at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.RemoveExpiredData()
         at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.Fetch(Bytes key, Int64 time)
         at Streamiz.Kafka.Net.State.Metered.MeteredWindowStore`2.<>c__DisplayClass20_0.<Fetch>b__0()
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func`1 actionToMeasure, Sensor sensor)
         at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor`4.Process(K key, V value)
         at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(IEnumerable`1 processors, Action`1 action)
         at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value)
         at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(ConsumeResult`2 record)
         at Streamiz.Kafka.Net.Processors.StreamTask.Process()
         at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()

How to reproduce

Create materializer thusly:

        var materializer
            = InMemoryWindows
                .As<TGroupedKey, TValueResult>(tableName)
                .WithKeySerdes((ISerDes<TGroupedKey>)SerDesHelper.GetSerDes<TGroupedKey>(KafkaConfiguration))
                .WithValueSerdes((ISerDes<TValueResult>)SerDesHelper.GetSerDes<TValueResult>(KafkaConfiguration))
                .WithRetention(TimeSpan.FromMilliseconds(_windowSizeMs));

Topology is created like so:


        builder.Stream<TKey, TValue>(TopicName).GroupBy(Grouper).Aggregate(Initializer, Aggregator, materializer)).Build();

Access store thusly:


        _windowStore ??= TableStream.Store(
            StoreQueryParameters.FromNameAndType(Name, QueryableStoreTypes.WindowStore<TGroupedKey, TValueResult>()));
        _windowStore
                .Fetch(
                    key,
                    now.Subtract(TimeSpan.FromMilliseconds(_windowSizeMs + _windowAdvanceMs)),
                    now.Add(TimeSpan.FromMilliseconds(_windowSizeMs)))
                .ToList();

-- OR --

        _windowStore
                .FetchAll(
                    now.Subtract(TimeSpan.FromMilliseconds(_windowSizeMs + _windowAdvanceMs)),
                    now.Add(TimeSpan.FromMilliseconds(_windowSizeMs)))
                .ToList();

The issue does not occur every time, but only with large volumes of data. I suspect the issue is due to the Streamiz framework calling Fetch() at the same time as our code.

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.
@LGouellec LGouellec added the bug Something isn't working label Oct 31, 2024
@LGouellec LGouellec added this to the 1.7.0 milestone Oct 31, 2024
@LGouellec LGouellec self-assigned this Oct 31, 2024
@LGouellec
Copy link
Owner

Hey @AntonyLittle,

Thank you for your issue and interest of Streamiz.
I will try to reproduce the issue and fix it as soon as possible.

Quick question : If you have a large amount of data, why not choosing RocksDb as a persistent layer of storage ?

Best regards,

@LGouellec LGouellec linked a pull request Nov 4, 2024 that will close this issue
@LGouellec LGouellec removed a link to a pull request Nov 4, 2024
@LGouellec
Copy link
Owner

@AntonyLittle ,

Are you able to test with this specific branch ? https://github.com/LGouellec/streamiz/tree/fix/concurrent-issue-window

Thanks,

@AntonyLittle
Copy link
Author

Certainly! We've had some success with adding mutex locking also, but we are still testing it. Hopeful that your fix will do the job :)

@AntonyLittle
Copy link
Author

@LGouellec Appologies for the delay, we've had some urgent issues. We should be able to report back on the status of your fix today or tomorrow.

@LGouellec
Copy link
Owner

@AntonyLittle No worries at all ;)

@AntonyLittle
Copy link
Author

Can confirm that your changes have fixed the issue. Thank you! Are you able to provide a nuget release?

@LGouellec
Copy link
Owner

Awesome @AntonyLittle ! This fix will be part the 1.7.0 release, coming soon.

@LGouellec LGouellec linked a pull request Nov 8, 2024 that will close this issue
LGouellec added a commit that referenced this issue Nov 9, 2024
#382 - Fix InMemoryWindowStore concurrent issue
@AntonyLittle
Copy link
Author

Awesome @AntonyLittle ! This fix will be part the 1.7.0 release, coming soon.

Do you know roughly how soon "soon" might be? Is there any chance of you doing a 1.6.1 release with just the fix added?

@LGouellec
Copy link
Owner

@AntonyLittle
Will be before end of November, I'll publish a 1.7.0-RC1 which will contains this fix in 1 week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants