You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a use case where I need to group and accumulate incoming objects by keys obtained from the objects until the size of the accumulation in bytes reaches a predefined threshold. If adding an object exceeds the threshold it should cut off and add the new object to a new batch, and emits the old batch. Along with this size-based boundary condition, it should also consider a timeout. When the timeout elapses, it should emit the batch and start accumulating in a new one.
Here is a simplified workflow that I would like to implement -
This would be achieved if the buffer size were generalized to consider the weight of the individual elements. By default, the weight of the elements would be 1, which would give the same effect as counting the elements as buffer size. Either bufferTimeout could be modified to generalize the boundary computation or a new function could be introduced as used in the example above.
Alternatively, bufferUntil could be modified to support timeouts.
Considered alternatives
bufferTimeout did not work as it does not support the dynamic buffer size determination.
bufferTimeout with the hack mentioned in here did not work. Could not achieve the strict size limit (as much as possible less than equal to limit).
bufferUntil did not work as it does not support timeouts. I could achieve timeout, but the hack would be too ugly.
Instead of implementing those ugly hacks, extending Flux would be much elegant.
The text was updated successfully, but these errors were encountered:
Motivation
I have a use case where I need to group and accumulate incoming objects by keys obtained from the objects until the size of the accumulation in bytes reaches a predefined threshold. If adding an object exceeds the threshold it should cut off and add the new object to a new batch, and emits the old batch. Along with this size-based boundary condition, it should also consider a timeout. When the timeout elapses, it should emit the batch and start accumulating in a new one.
Here is a simplified workflow that I would like to implement -
Desired solution
This would be achieved if the buffer size were generalized to consider the weight of the individual elements. By default, the weight of the elements would be 1, which would give the same effect as counting the elements as buffer size. Either
bufferTimeout
could be modified to generalize the boundary computation or a new function could be introduced as used in the example above.Alternatively,
bufferUntil
could be modified to support timeouts.Considered alternatives
bufferTimeout
did not work as it does not support the dynamic buffer size determination.bufferTimeout
with the hack mentioned in here did not work. Could not achieve the strict size limit (as much as possible less than equal to limit).bufferUntil
did not work as it does not support timeouts. I could achieve timeout, but the hack would be too ugly.Instead of implementing those ugly hacks, extending Flux would be much elegant.
The text was updated successfully, but these errors were encountered: