You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I ran into an edge case when using the shapeless extensions with Flow.flatMapConcat and one of the flows in the coproductFlow had a broadcast and merge in it.
It seems the CoproductFlexiMerge does not handle the case where a sub flow might push more than one value to it's inlets well. In this setup I think it incorrectly sets itself as being complete after pulling a single value from the inlet and never processes the second output of my internal broadcast/merge graph. In a flow without flatMapConcat this wasn't a problem because the graph just kept running until all data was processed but flatMapConcat seems to shutdown the subgraph as soon as it receives the upStreamComplete message and data can be lost. Sorry I can't give a better explanation as to what's causing the problem. The internals of akka-streams are a mystery to me.
Here's a representation of the graph that caused the problem
Source(foreverData) -> flatMapConcat(data)
Source.single(data) -> coproductFlow
-> normal flow ->
/-> flow ->\
-> broadcast merge -> (this output will now have two values)
\-> flow ->/
The text was updated successfully, but these errors were encountered:
I ran into an edge case when using the shapeless extensions with Flow.flatMapConcat and one of the flows in the coproductFlow had a broadcast and merge in it.
It seems the CoproductFlexiMerge does not handle the case where a sub flow might push more than one value to it's inlets well. In this setup I think it incorrectly sets itself as being complete after pulling a single value from the inlet and never processes the second output of my internal broadcast/merge graph. In a flow without flatMapConcat this wasn't a problem because the graph just kept running until all data was processed but flatMapConcat seems to shutdown the subgraph as soon as it receives the upStreamComplete message and data can be lost. Sorry I can't give a better explanation as to what's causing the problem. The internals of akka-streams are a mystery to me.
Here's a representation of the graph that caused the problem
The text was updated successfully, but these errors were encountered: