Skip to content

Commit

Permalink
Enforce one-execution-per-chain
Browse files Browse the repository at this point in the history
One of the fallouts of executing callbacks in a deferred state is that
the locks no longer prevent cyclic updates. For well defined values,
this works perfectly. But for something like floating points, this can
cause not-quite-the-same values to continue to propagate until they
stabilize. The 7guis-temperature-converter, for example, would
sometimes replace the currently editing text with an imprecise float
result computed from a roundtrip temperature conversion.

This change adds tracking to these deferred callbacks that ensures for
each "root" invocation of a dynamic's callbacks, all affected change
callbacks are invoked a single time in any given execution chain, taking
care to support divergent chains causing an individual set of callbacks
to be invoked more than one time as long as their individiaul call
chains have not yet invoked the callbacks.
  • Loading branch information
ecton committed Jan 5, 2025
1 parent b09dbb8 commit f1a4f95
Showing 1 changed file with 214 additions and 37 deletions.
251 changes: 214 additions & 37 deletions src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,10 @@ impl ChangeCallbacks {
self.data.lock.sync.notify_all();
count
}

fn id(&self) -> CallbacksId {
CallbacksId(Arc::as_ptr(&self.data) as usize)
}
}

trait ValueCallback: Send {
Expand Down Expand Up @@ -4565,76 +4569,249 @@ where
}

fn defer_execute_callbacks(callbacks: ChangeCallbacks) {
static THREAD_SENDER: Lazy<mpsc::SyncSender<ChangeCallbacks>> = Lazy::new(|| {
let (sender, receiver) = mpsc::sync_channel(256);
std::thread::spawn(move || CallbackExecutor::new(receiver).run());
sender
});
let _ = THREAD_SENDER.send(callbacks);
static THREAD_SENDER: Lazy<mpsc::SyncSender<(ChangeCallbacks, Option<CallbackInvocationId>)>> =
Lazy::new(|| {
let (sender, receiver) = mpsc::sync_channel(256);
std::thread::spawn(move || CallbackExecutor::new(receiver).run());
sender
});
let id = EXECUTING_CALLBACK_ROOT.get();
let _ = THREAD_SENDER.send((callbacks, id));
}

#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
struct CallbacksId(usize);

#[derive(Clone, Copy, Eq, PartialEq)]
struct CallbackInvocationId {
node: LotId,
invocation_root: CallbacksId,
}

struct InvocationTreeNode {
id: CallbacksId,
enqueued: Option<ChangeCallbacks>,
parent: Option<LotId>,
first_child: Option<LotId>,
root_callbacks: CallbacksId,
next: Option<LotId>,
}

#[derive(Default)]
struct InvocationTree {
nodes: Lots<InvocationTreeNode>,
}

impl InvocationTree {
fn new_root(&mut self, callbacks: ChangeCallbacks) -> CallbackInvocationId {
let callbacks_id = callbacks.id();
let node = self.nodes.push(InvocationTreeNode {
id: callbacks_id,
parent: None,
first_child: None,
root_callbacks: callbacks_id,
enqueued: Some(callbacks),
next: None,
});
CallbackInvocationId {
node,
invocation_root: callbacks_id,
}
}

/// Pushes `invoked` into the list of callbacks executed by group of
/// callbacks pointed to by `invoked_by`.
///
/// # Errors
///
/// Returns an error if `invoked` has already been executed by this group of
/// callbacks.
fn push(
&mut self,
callbacks: ChangeCallbacks,
enqueued_while_executing: Option<CallbackInvocationId>,
) -> Option<CallbackInvocationId> {
if let Some(enqueued_while_executing) = enqueued_while_executing {
// Verify that `callbacks` wasn't executed in the chain leading to the node that was executing this node.
let mut search = enqueued_while_executing.node;
let callbacks_id = callbacks.id();
loop {
let node = &self.nodes[search];
if node.id == callbacks_id {
// This set of callbacks has already been executed in this
// chain.
return None;
}
let Some(parent) = node.parent else {
break;
};
search = parent;
}
let root_invoked_by = enqueued_while_executing.invocation_root;
let insert_before =
if let Some(mut node_id) = self.nodes[enqueued_while_executing.node].first_child {
loop {
let node = &mut self.nodes[node_id];
if node.id == callbacks_id {
if let Some(enqueued) = &mut node.enqueued {
enqueued.changed_at = enqueued.changed_at.max(callbacks.changed_at);
return None;
}

node.enqueued = Some(callbacks);
return Some(CallbackInvocationId {
node: node_id,
invocation_root: root_invoked_by,
});
}

if let Some(next) = node.next {
// Continue traversing the list.
node_id = next;
} else {
break Some(node_id);
}
}
} else {
None
};

// `callbacks` hasn't been executed by the list pointed at by
// `enqueued_while_executing`.
let id = self.nodes.push(InvocationTreeNode {
id: callbacks.id(),
enqueued: Some(callbacks),
parent: Some(enqueued_while_executing.node),
first_child: None,
root_callbacks: root_invoked_by,
next: insert_before,
});
self.nodes[enqueued_while_executing.node].first_child = Some(id);
Some(CallbackInvocationId {
node: id,
invocation_root: root_invoked_by,
})
} else {
// New root
Some(self.new_root(callbacks))
}
}

fn complete(&mut self, invocation: CallbackInvocationId) {
self.remove_completed_recursive(invocation.node);
}

fn remove_completed_recursive(&mut self, mut node_id: LotId) {
let mut node = &mut self.nodes[node_id];
while node.enqueued.is_none() && node.first_child.is_none() {
let removed = self.nodes.remove(node_id).expect("node present");
let Some(parent_id) = removed.parent else {
break;
};
let mut parent = &mut self.nodes[parent_id];
// Repair the linked list
if parent.first_child == Some(node_id) {
parent.first_child = removed.next;
} else {
let mut current = parent.first_child.expect("valid child");
while self.nodes[current].next != Some(node_id) {
current = self.nodes[current].next.expect("removed node to exist");
}
self.nodes[current].next = removed.next;
parent = &mut self.nodes[parent_id];
}

// Attempt to remove the parent if this was its last node.
node = parent;
node_id = parent_id;
}
}
}

thread_local! {
static EXECUTING_CALLBACK_ROOT: Cell<Option<CallbackInvocationId>> = const { Cell::new(None) };
}

struct EnqueuedCallbacks {
node_id: CallbackInvocationId,
callbacks: ChangeCallbacks,
}

struct CallbackExecutor {
receiver: mpsc::Receiver<ChangeCallbacks>,
enqueued: Map<*const ChangeCallbacksData, LotId>,
callbacks: Lots<ChangeCallbacks>,
receiver: mpsc::Receiver<(ChangeCallbacks, Option<CallbackInvocationId>)>,

invocations: InvocationTree,
queue: VecDeque<LotId>,
}

impl CallbackExecutor {
fn new(receiver: mpsc::Receiver<ChangeCallbacks>) -> Self {
fn new(receiver: mpsc::Receiver<(ChangeCallbacks, Option<CallbackInvocationId>)>) -> Self {
Self {
receiver,
enqueued: Map::new(),
callbacks: Lots::new(),
invocations: InvocationTree::default(),
queue: VecDeque::new(),
}
}

fn enqueue_nonblocking(&mut self) {
// Exhaust any pending callbacks without blocking.
while let Ok((callbacks, invoked_by)) = self.receiver.try_recv() {
self.enqueue(callbacks, invoked_by);
}
}

fn run(mut self) {
IS_EXECUTOR_THREAD.set(true);

// Because this is stored in a static, this likely will never return an
// error, but if it does, it's during program shutdown, and we can exit safely.
while let Ok(callbacks) = self.receiver.recv() {
self.enqueue(callbacks);
while let Ok((callbacks, invoked_by)) = self.receiver.recv() {
self.enqueue(callbacks, invoked_by);
let mut callbacks_executed = 0;
loop {
// Exhaust any pending callbacks without blocking.
while let Ok(callbacks) = self.receiver.try_recv() {
self.enqueue(callbacks);
}

let Some(callbacks) = self.pop_callbacks() else {
let Some(enqueued) = self.pop_callbacks() else {
break;
};
callbacks_executed += callbacks.execute();
EXECUTING_CALLBACK_ROOT.set(Some(enqueued.node_id));
callbacks_executed += enqueued.callbacks.execute();

// Enqueue any queued operations before we complete this
// invocation to ensure all related invocations are tracked.
self.enqueue_nonblocking();
self.invocations.complete(enqueued.node_id);
}
EXECUTING_CALLBACK_ROOT.set(None);

// Once we've exited the loop, we can assume all callback invocation
// chains have completed.
assert!(self.invocations.nodes.is_empty());

if callbacks_executed > 0 {
tracing::trace!("{callbacks_executed} callbacks executed");
}
}
}

fn enqueue(&mut self, callbacks: ChangeCallbacks) {
match self.enqueued.entry(Arc::as_ptr(&callbacks.data)) {
map::Entry::Occupied(id) => {
self.callbacks[*id].changed_at =
self.callbacks[*id].changed_at.max(callbacks.changed_at);
}
map::Entry::Vacant(entry) => {
let id = self.callbacks.push(callbacks);
entry.insert(id);
self.queue.push_back(id);
}
fn enqueue(&mut self, callbacks: ChangeCallbacks, invoked_by: Option<CallbackInvocationId>) {
if let Some(pushed) = self.invocations.push(callbacks, invoked_by) {
self.queue.push_back(pushed.node);
}
}

fn pop_callbacks(&mut self) -> Option<ChangeCallbacks> {
let id = self.queue.pop_front()?;
let callbacks = self.callbacks.remove(id)?;
self.enqueued.remove(&Arc::as_ptr(&callbacks.data));
Some(callbacks)
fn pop_callbacks(&mut self) -> Option<EnqueuedCallbacks> {
while let Some(id) = self.queue.pop_front() {
if let Some(callbacks) = self.invocations.nodes[id].enqueued.take() {
return Some(EnqueuedCallbacks {
callbacks,
node_id: CallbackInvocationId {
node: id,
invocation_root: self.invocations.nodes[id].root_callbacks,
},
});
}
}

None
}

fn is_current_thread() -> bool {
Expand Down

0 comments on commit f1a4f95

Please sign in to comment.