From f1a4f955e78180bab877a6a6db7ca7fe259a0521 Mon Sep 17 00:00:00 2001 From: Jonathan Johnson Date: Sun, 5 Jan 2025 08:15:10 -0800 Subject: [PATCH] Enforce one-execution-per-chain One of the fallouts of executing callbacks in a deferred state is that the locks no longer prevent cyclic updates. For well defined values, this works perfectly. But for something like floating points, this can cause not-quite-the-same values to continue to propagate until they stabilize. The 7guis-temperature-converter, for example, would sometimes replace the currently editing text with an imprecise float result computed from a roundtrip temperature conversion. This change adds tracking to these deferred callbacks that ensures for each "root" invocation of a dynamic's callbacks, all affected change callbacks are invoked a single time in any given execution chain, taking care to support divergent chains causing an individual set of callbacks to be invoked more than one time as long as their individiaul call chains have not yet invoked the callbacks. --- src/value.rs | 251 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 214 insertions(+), 37 deletions(-) diff --git a/src/value.rs b/src/value.rs index dbcc4fb7a..7956d79ed 100644 --- a/src/value.rs +++ b/src/value.rs @@ -2306,6 +2306,10 @@ impl ChangeCallbacks { self.data.lock.sync.notify_all(); count } + + fn id(&self) -> CallbacksId { + CallbacksId(Arc::as_ptr(&self.data) as usize) + } } trait ValueCallback: Send { @@ -4565,50 +4569,222 @@ where } fn defer_execute_callbacks(callbacks: ChangeCallbacks) { - static THREAD_SENDER: Lazy> = Lazy::new(|| { - let (sender, receiver) = mpsc::sync_channel(256); - std::thread::spawn(move || CallbackExecutor::new(receiver).run()); - sender - }); - let _ = THREAD_SENDER.send(callbacks); + static THREAD_SENDER: Lazy)>> = + Lazy::new(|| { + let (sender, receiver) = mpsc::sync_channel(256); + std::thread::spawn(move || CallbackExecutor::new(receiver).run()); + sender + }); + let id = EXECUTING_CALLBACK_ROOT.get(); + let _ = THREAD_SENDER.send((callbacks, id)); +} + +#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] +struct CallbacksId(usize); + +#[derive(Clone, Copy, Eq, PartialEq)] +struct CallbackInvocationId { + node: LotId, + invocation_root: CallbacksId, +} + +struct InvocationTreeNode { + id: CallbacksId, + enqueued: Option, + parent: Option, + first_child: Option, + root_callbacks: CallbacksId, + next: Option, +} + +#[derive(Default)] +struct InvocationTree { + nodes: Lots, +} + +impl InvocationTree { + fn new_root(&mut self, callbacks: ChangeCallbacks) -> CallbackInvocationId { + let callbacks_id = callbacks.id(); + let node = self.nodes.push(InvocationTreeNode { + id: callbacks_id, + parent: None, + first_child: None, + root_callbacks: callbacks_id, + enqueued: Some(callbacks), + next: None, + }); + CallbackInvocationId { + node, + invocation_root: callbacks_id, + } + } + + /// Pushes `invoked` into the list of callbacks executed by group of + /// callbacks pointed to by `invoked_by`. + /// + /// # Errors + /// + /// Returns an error if `invoked` has already been executed by this group of + /// callbacks. + fn push( + &mut self, + callbacks: ChangeCallbacks, + enqueued_while_executing: Option, + ) -> Option { + if let Some(enqueued_while_executing) = enqueued_while_executing { + // Verify that `callbacks` wasn't executed in the chain leading to the node that was executing this node. + let mut search = enqueued_while_executing.node; + let callbacks_id = callbacks.id(); + loop { + let node = &self.nodes[search]; + if node.id == callbacks_id { + // This set of callbacks has already been executed in this + // chain. + return None; + } + let Some(parent) = node.parent else { + break; + }; + search = parent; + } + let root_invoked_by = enqueued_while_executing.invocation_root; + let insert_before = + if let Some(mut node_id) = self.nodes[enqueued_while_executing.node].first_child { + loop { + let node = &mut self.nodes[node_id]; + if node.id == callbacks_id { + if let Some(enqueued) = &mut node.enqueued { + enqueued.changed_at = enqueued.changed_at.max(callbacks.changed_at); + return None; + } + + node.enqueued = Some(callbacks); + return Some(CallbackInvocationId { + node: node_id, + invocation_root: root_invoked_by, + }); + } + + if let Some(next) = node.next { + // Continue traversing the list. + node_id = next; + } else { + break Some(node_id); + } + } + } else { + None + }; + + // `callbacks` hasn't been executed by the list pointed at by + // `enqueued_while_executing`. + let id = self.nodes.push(InvocationTreeNode { + id: callbacks.id(), + enqueued: Some(callbacks), + parent: Some(enqueued_while_executing.node), + first_child: None, + root_callbacks: root_invoked_by, + next: insert_before, + }); + self.nodes[enqueued_while_executing.node].first_child = Some(id); + Some(CallbackInvocationId { + node: id, + invocation_root: root_invoked_by, + }) + } else { + // New root + Some(self.new_root(callbacks)) + } + } + + fn complete(&mut self, invocation: CallbackInvocationId) { + self.remove_completed_recursive(invocation.node); + } + + fn remove_completed_recursive(&mut self, mut node_id: LotId) { + let mut node = &mut self.nodes[node_id]; + while node.enqueued.is_none() && node.first_child.is_none() { + let removed = self.nodes.remove(node_id).expect("node present"); + let Some(parent_id) = removed.parent else { + break; + }; + let mut parent = &mut self.nodes[parent_id]; + // Repair the linked list + if parent.first_child == Some(node_id) { + parent.first_child = removed.next; + } else { + let mut current = parent.first_child.expect("valid child"); + while self.nodes[current].next != Some(node_id) { + current = self.nodes[current].next.expect("removed node to exist"); + } + self.nodes[current].next = removed.next; + parent = &mut self.nodes[parent_id]; + } + + // Attempt to remove the parent if this was its last node. + node = parent; + node_id = parent_id; + } + } +} + +thread_local! { + static EXECUTING_CALLBACK_ROOT: Cell> = const { Cell::new(None) }; +} + +struct EnqueuedCallbacks { + node_id: CallbackInvocationId, + callbacks: ChangeCallbacks, } struct CallbackExecutor { - receiver: mpsc::Receiver, - enqueued: Map<*const ChangeCallbacksData, LotId>, - callbacks: Lots, + receiver: mpsc::Receiver<(ChangeCallbacks, Option)>, + + invocations: InvocationTree, queue: VecDeque, } impl CallbackExecutor { - fn new(receiver: mpsc::Receiver) -> Self { + fn new(receiver: mpsc::Receiver<(ChangeCallbacks, Option)>) -> Self { Self { receiver, - enqueued: Map::new(), - callbacks: Lots::new(), + invocations: InvocationTree::default(), queue: VecDeque::new(), } } + fn enqueue_nonblocking(&mut self) { + // Exhaust any pending callbacks without blocking. + while let Ok((callbacks, invoked_by)) = self.receiver.try_recv() { + self.enqueue(callbacks, invoked_by); + } + } + fn run(mut self) { IS_EXECUTOR_THREAD.set(true); // Because this is stored in a static, this likely will never return an // error, but if it does, it's during program shutdown, and we can exit safely. - while let Ok(callbacks) = self.receiver.recv() { - self.enqueue(callbacks); + while let Ok((callbacks, invoked_by)) = self.receiver.recv() { + self.enqueue(callbacks, invoked_by); let mut callbacks_executed = 0; loop { - // Exhaust any pending callbacks without blocking. - while let Ok(callbacks) = self.receiver.try_recv() { - self.enqueue(callbacks); - } - - let Some(callbacks) = self.pop_callbacks() else { + let Some(enqueued) = self.pop_callbacks() else { break; }; - callbacks_executed += callbacks.execute(); + EXECUTING_CALLBACK_ROOT.set(Some(enqueued.node_id)); + callbacks_executed += enqueued.callbacks.execute(); + + // Enqueue any queued operations before we complete this + // invocation to ensure all related invocations are tracked. + self.enqueue_nonblocking(); + self.invocations.complete(enqueued.node_id); } + EXECUTING_CALLBACK_ROOT.set(None); + + // Once we've exited the loop, we can assume all callback invocation + // chains have completed. + assert!(self.invocations.nodes.is_empty()); if callbacks_executed > 0 { tracing::trace!("{callbacks_executed} callbacks executed"); @@ -4616,25 +4792,26 @@ impl CallbackExecutor { } } - fn enqueue(&mut self, callbacks: ChangeCallbacks) { - match self.enqueued.entry(Arc::as_ptr(&callbacks.data)) { - map::Entry::Occupied(id) => { - self.callbacks[*id].changed_at = - self.callbacks[*id].changed_at.max(callbacks.changed_at); - } - map::Entry::Vacant(entry) => { - let id = self.callbacks.push(callbacks); - entry.insert(id); - self.queue.push_back(id); - } + fn enqueue(&mut self, callbacks: ChangeCallbacks, invoked_by: Option) { + if let Some(pushed) = self.invocations.push(callbacks, invoked_by) { + self.queue.push_back(pushed.node); } } - fn pop_callbacks(&mut self) -> Option { - let id = self.queue.pop_front()?; - let callbacks = self.callbacks.remove(id)?; - self.enqueued.remove(&Arc::as_ptr(&callbacks.data)); - Some(callbacks) + fn pop_callbacks(&mut self) -> Option { + while let Some(id) = self.queue.pop_front() { + if let Some(callbacks) = self.invocations.nodes[id].enqueued.take() { + return Some(EnqueuedCallbacks { + callbacks, + node_id: CallbackInvocationId { + node: id, + invocation_root: self.invocations.nodes[id].root_callbacks, + }, + }); + } + } + + None } fn is_current_thread() -> bool {