From 4d4b8b321481a3d55b525d24aab2c7270613ef5c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 29 Jan 2021 18:26:58 -0500 Subject: [PATCH] protect upsert workers from input overload --- src/operators/arrange/upsert.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 88f137197..b890e4603 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -193,16 +193,23 @@ where let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); + let mut buffers = Vec::new(); + move |input, output| { - // Stash capabilities and associated data (ordered by time). + // Read input out without doing work, to avoid trapping overloaded workers. input.for_each(|cap, data| { capabilities.insert(cap.retain()); data.swap(&mut buffer); - for (key, val, time) in buffer.drain(..) { + buffers.push(std::mem::take(&mut buffer)); + }); + + // Stash capabilities and associated data (ordered by time). + for buffer in buffers.drain(..) { + for (key, val, time) in buffer { priority_queue.push(std::cmp::Reverse((time, key, val))) } - }); + } // Test to see if strict progress has occurred, which happens whenever any element of // the old frontier is not greater or equal to the new frontier. It is only in this