From 91d9b42681e49f81eca70a19ef0e59b85c95b232 Mon Sep 17 00:00:00 2001 From: John Baublitz Date: Thu, 1 Feb 2024 16:50:35 -0500 Subject: [PATCH] Relax fairness restriciton on AllOrSomeLock This commit relaxes the previous problematic restriction that was meant to enforce fairness at the cost of throughput. Previously all waiting lock acquisitions were queued and if there was a conflicting lock acquisition before a lock acquisition that could currently be served, it would still be left queued. This commit relaxes that restriction due to its high potential to cause deadlocks in stratisd. --- src/engine/structures/lock.rs | 33 +++++++++++---------------------- 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/src/engine/structures/lock.rs b/src/engine/structures/lock.rs index e41c63ef45..a849a28b9b 100644 --- a/src/engine/structures/lock.rs +++ b/src/engine/structures/lock.rs @@ -292,8 +292,7 @@ where /// Returns true if the current request should be put in the wait queue. /// * Always returns false if the index for the given request is in the record of woken /// tasks. - /// * Otherwise, returns true if any of the following conditions are met: - /// * There are already tasks waiting in the queue. + /// * Otherwise, returns true if either of the following conditions are met: /// * The lock already has a conflicting acquisition. /// * The request conflicts with any tasks that have already been woken up. fn should_wait(&self, ty: &WaitType, idx: u64) -> bool { @@ -305,9 +304,7 @@ where ); false } else { - let should_wait = !self.waiting.is_empty() - || self.already_acquired(ty) - || self.conflicts_with_woken(ty); + let should_wait = self.already_acquired(ty) || self.conflicts_with_woken(ty); if should_wait { trace!( "Putting task with index {}, wait type {:?} to sleep", @@ -372,25 +369,17 @@ where } } - /// Determines whether a task should be woken up from the queue. - /// Returns true if: - /// * The waiting task does not conflict with any already woken tasks. - /// * The waiting task does not conflict with any locks currently held. - fn should_wake(&self) -> bool { - if let Some(w) = self.waiting.front() { - !self.conflicts_with_woken(&w.ty) && !self.already_acquired(&w.ty) - } else { - false - } - } - - /// Wake all non-conflicting tasks in the queue and stop at the first conflicting task. + /// Wake all non-conflicting tasks in the queue. /// Adds all woken tasks to the record of woken tasks. fn wake(&mut self) { - while self.should_wake() { - if let Some(w) = self.waiting.pop_front() { - self.woken.insert(w.idx, w.ty); - w.waker.wake(); + let mut waiting = VecDeque::new(); + std::mem::swap(&mut waiting, &mut self.waiting); + for waiter in waiting.drain(..) { + if !self.conflicts_with_woken(&waiter.ty) && !self.already_acquired(&waiter.ty) { + self.woken.insert(waiter.idx, waiter.ty); + waiter.waker.wake(); + } else { + self.waiting.push_back(waiter); } } }