From f19edde93252381b7a1789bf856a3a67df23f6db Mon Sep 17 00:00:00 2001 From: Nagata Parama Aptana <61454123+lifers@users.noreply.github.com> Date: Fri, 16 Aug 2024 13:04:19 -0400 Subject: [PATCH] Improved `Event` implementation (#3198) --- crates/libs/core/src/event.rs | 291 ++++++------------------------ crates/tests/event/tests/tests.rs | 49 +++-- 2 files changed, 93 insertions(+), 247 deletions(-) diff --git a/crates/libs/core/src/event.rs b/crates/libs/core/src/event.rs index 18211f9c1b..c04b2f8352 100644 --- a/crates/libs/core/src/event.rs +++ b/crates/libs/core/src/event.rs @@ -1,20 +1,18 @@ use super::*; -use core::ffi::c_void; -use core::marker::PhantomData; -use core::mem::{size_of, transmute_copy}; -use core::ptr::null_mut; -use std::sync::Mutex; +use core::{iter::once, mem::transmute_copy}; +use std::sync::{Arc, RwLock}; /// A type that you can use to declare and implement an event of a specified delegate type. /// /// The implementation is thread-safe and designed to avoid contention between events being /// raised and delegates being added or removed. pub struct Event { - swap: Mutex<()>, - change: Mutex<()>, - delegates: Array, + delegates: RwLock]>>>, } +unsafe impl Send for Event {} +unsafe impl Sync for Event {} + impl Default for Event { fn default() -> Self { Self::new() @@ -25,228 +23,86 @@ impl Event { /// Creates a new, empty `Event`. pub fn new() -> Self { Self { - delegates: Array::new(), - swap: Mutex::default(), - change: Mutex::default(), + delegates: RwLock::new(None), } } /// Registers a delegate with the event object. - pub fn add(&mut self, delegate: &T) -> Result { - let mut _lock_free_drop = Array::new(); - Ok({ - let _change_lock = self.change.lock().unwrap(); - let mut new_delegates = Array::with_capacity(self.delegates.len() + 1)?; - for delegate in self.delegates.as_slice() { - new_delegates.push(delegate.clone()); - } - let delegate = Delegate::new(delegate)?; - let token = delegate.to_token(); - new_delegates.push(delegate); + pub fn add(&self, delegate: &T) -> Result { + let new_delegate = Delegate::new(delegate)?; + let token = new_delegate.to_token(); + let new_iter = once(new_delegate); + let mut guard = self.delegates.write().unwrap(); + + let new_list = if let Some(old_delegates) = guard.as_ref() { + Arc::from_iter(old_delegates.iter().cloned().chain(new_iter)) + } else { + Arc::from_iter(new_iter) + }; + + let old_list = guard.replace(new_list); + drop(guard); + drop(old_list); // drop the old delegates _after_ releasing lock - let _swap_lock = self.swap.lock().unwrap(); - _lock_free_drop = self.delegates.swap(new_delegates); - token - }) + Ok(token) } /// Revokes a delegate's registration from the event object. - pub fn remove(&mut self, token: i64) -> Result<()> { - let mut _lock_free_drop = Array::new(); - { - let _change_lock = self.change.lock().unwrap(); - if self.delegates.is_empty() { - return Ok(()); - } - let mut capacity = self.delegates.len() - 1; - let mut new_delegates = Array::new(); - let mut removed = false; - if capacity == 0 { - removed = self.delegates.as_slice()[0].to_token() == token; - } else { - new_delegates = Array::with_capacity(capacity)?; - for delegate in self.delegates.as_slice() { - if !removed && delegate.to_token() == token { - removed = true; - continue; - } - if capacity == 0 { - break; - } - new_delegates.push(delegate.clone()); - capacity -= 1; - } - } - if removed { - let _swap_lock = self.swap.lock().unwrap(); - _lock_free_drop = self.delegates.swap(new_delegates); + pub fn remove(&self, token: i64) { + let mut guard = self.delegates.write().unwrap(); + let mut old_list = None; + if let Some(old_delegates) = guard.as_ref() { + // `self.delegates` is only modified if the token is found. + if let Some(i) = old_delegates + .iter() + .position(|old_delegate| old_delegate.to_token() == token) + { + let new_list = Arc::from_iter( + old_delegates[..i] + .iter() + .chain(old_delegates[i + 1..].iter()) + .cloned(), + ); + + old_list = guard.replace(new_list); } } - Ok(()) + drop(guard); + drop(old_list); // drop the old delegates _after_ releasing lock } /// Clears the event, removing all delegates. - pub fn clear(&mut self) { - let mut _lock_free_drop = Array::new(); - { - let _change_lock = self.change.lock().unwrap(); - if self.delegates.is_empty() { - return; - } - let _swap_lock = self.swap.lock().unwrap(); - _lock_free_drop = self.delegates.swap(Array::new()); - } + pub fn clear(&self) { + let mut guard = self.delegates.write().unwrap(); + let old_list = guard.take(); + drop(guard); + drop(old_list); // drop the old delegates _after_ releasing lock } /// Invokes all of the event object's registered delegates with the provided callback. - pub fn call Result<()>>(&mut self, mut callback: F) -> Result<()> { - let lock_free_calls = { - let _swap_lock = self.swap.lock().unwrap(); - self.delegates.clone() + pub fn call Result<()>>(&self, mut callback: F) { + let delegates = { + let guard = self.delegates.read().unwrap(); + if let Some(delegates) = guard.as_ref() { + delegates.clone() + } else { + // No delegates to call. + return; + } + // <-- lock is released here }; - for delegate in lock_free_calls.as_slice() { + + for delegate in delegates.iter() { if let Err(error) = delegate.call(&mut callback) { const RPC_E_SERVER_UNAVAILABLE: HRESULT = HRESULT(-2147023174); // HRESULT_FROM_WIN32(RPC_S_SERVER_UNAVAILABLE) if matches!( error.code(), imp::RPC_E_DISCONNECTED | imp::JSCRIPT_E_CANTEXECUTE | RPC_E_SERVER_UNAVAILABLE ) { - self.remove(delegate.to_token())?; + self.remove(delegate.to_token()); } } } - Ok(()) - } -} - -/// A thread-safe reference-counted array of delegates. -struct Array { - buffer: *mut Buffer, - len: usize, - _phantom: PhantomData, -} - -impl Default for Array { - fn default() -> Self { - Self::new() - } -} - -impl Array { - /// Creates a new, empty `Array` with no capacity. - fn new() -> Self { - Self { - buffer: null_mut(), - len: 0, - _phantom: PhantomData, - } - } - - /// Creates a new, empty `Array` with the specified capacity. - fn with_capacity(capacity: usize) -> Result { - Ok(Self { - buffer: Buffer::new(capacity)?, - len: 0, - _phantom: PhantomData, - }) - } - - /// Swaps the contents of two `Array` objects. - fn swap(&mut self, mut other: Self) -> Self { - unsafe { core::ptr::swap(&mut self.buffer, &mut other.buffer) }; - core::mem::swap(&mut self.len, &mut other.len); - other - } - - /// Returns `true` if the array contains no delegates. - fn is_empty(&self) -> bool { - self.len == 0 - } - - /// Returns the number of delegates in the array. - fn len(&self) -> usize { - self.len - } - - /// Appends a delegate to the back of the array. - fn push(&mut self, delegate: Delegate) { - unsafe { - (*self.buffer).as_mut_ptr().add(self.len).write(delegate); - self.len += 1; - } - } - - /// Returns a slice containing of all delegates. - fn as_slice(&self) -> &[Delegate] { - if self.is_empty() { - &[] - } else { - unsafe { core::slice::from_raw_parts((*self.buffer).as_ptr(), self.len) } - } - } - - /// Returns a mutable slice of all delegates. - fn as_mut_slice(&mut self) -> &mut [Delegate] { - if self.is_empty() { - &mut [] - } else { - unsafe { core::slice::from_raw_parts_mut((*self.buffer).as_mut_ptr(), self.len) } - } - } -} - -impl Clone for Array { - fn clone(&self) -> Self { - if !self.is_empty() { - unsafe { (*self.buffer).0.add_ref() }; - } - Self { - buffer: self.buffer, - len: self.len, - _phantom: PhantomData, - } - } -} - -impl Drop for Array { - fn drop(&mut self) { - unsafe { - if !self.is_empty() && (*self.buffer).0.release() == 0 { - core::ptr::drop_in_place(self.as_mut_slice()); - heap_free(self.buffer as _) - } - } - } -} - -/// A reference-counted buffer. -#[repr(C)] -#[repr(align(8))] -struct Buffer(imp::RefCount, PhantomData); - -impl Buffer { - /// Creates a new `Buffer` with the specified size in bytes. - fn new(len: usize) -> Result<*mut Self> { - if len == 0 { - Ok(null_mut()) - } else { - let alloc_size = size_of::() + len * size_of::>(); - let header = heap_alloc(alloc_size)? as *mut Self; - unsafe { - header.write(Self(imp::RefCount::new(1), PhantomData)); - } - Ok(header) - } - } - - /// Returns a raw pointer to the buffer's contents. The resulting pointer might be uninititalized. - fn as_ptr(&self) -> *const Delegate { - unsafe { (self as *const Self).add(1) as *const _ } - } - - /// Returns a raw mutable pointer to the buffer's contents. The resulting pointer might be uninititalized. - fn as_mut_ptr(&mut self) -> *mut Delegate { - unsafe { (self as *mut Self).add(1) as *mut _ } } } @@ -286,30 +142,3 @@ impl Delegate { } } } - -/// Allocate memory of size `bytes` using `malloc` - the `Event` implementation does not -/// need to use any particular allocator so `HeapAlloc` need not be used. -fn heap_alloc(bytes: usize) -> crate::Result<*mut c_void> { - let ptr: *mut c_void = unsafe { - extern "C" { - fn malloc(bytes: usize) -> *mut c_void; - } - - malloc(bytes) - }; - - if ptr.is_null() { - Err(Error::from_hresult(imp::E_OUTOFMEMORY)) - } else { - Ok(ptr) - } -} - -/// Free memory allocated by `heap_alloc`. -unsafe fn heap_free(ptr: *mut c_void) { - extern "C" { - fn free(ptr: *mut c_void); - } - - free(ptr); -} diff --git a/crates/tests/event/tests/tests.rs b/crates/tests/event/tests/tests.rs index fe27dc2530..06b964766c 100644 --- a/crates/tests/event/tests/tests.rs +++ b/crates/tests/event/tests/tests.rs @@ -4,10 +4,10 @@ use windows::{core::*, Foundation::*}; #[test] fn add_remove() -> Result<()> { - let mut event = Event::>::new(); + let event = Event::>::new(); // Remove a bogus event handler from an empty event source. - event.remove(-123)?; + event.remove(-123); let check = Arc::new(AtomicI32::new(0)); let check_sender = check.clone(); @@ -19,11 +19,11 @@ fn add_remove() -> Result<()> { }))?; // Remove a bogus event handler from a non-empty event source. - event.remove(-123)?; + event.remove(-123); // Raise and observe event. assert_eq!(check.load(Ordering::Relaxed), 0); - event.call(|delegate| delegate.Invoke(None, 123))?; + event.call(|delegate| delegate.Invoke(None, 123)); assert_eq!(check.load(Ordering::Relaxed), 123); // Remove event handler. @@ -31,7 +31,7 @@ fn add_remove() -> Result<()> { // Raise event without effect. check.store(0, Ordering::Relaxed); - event.call(|delegate| delegate.Invoke(None, 123))?; + event.call(|delegate| delegate.Invoke(None, 123)); assert_eq!(check.load(Ordering::Relaxed), 0); Ok(()) @@ -39,7 +39,7 @@ fn add_remove() -> Result<()> { #[test] fn multiple() -> Result<()> { - let mut event = Event::>::new(); + let event = Event::>::new(); let a_check = Arc::new(AtomicI32::new(0)); let a_check_sender = a_check.clone(); @@ -51,7 +51,7 @@ fn multiple() -> Result<()> { assert_eq!(a_check.load(Ordering::Relaxed), 0); assert_eq!(b_check.load(Ordering::Relaxed), 0); assert_eq!(c_check.load(Ordering::Relaxed), 0); - event.call(|delegate| delegate.Invoke(None, 10))?; + event.call(|delegate| delegate.Invoke(None, 10)); assert_eq!(a_check.load(Ordering::Relaxed), 0); assert_eq!(b_check.load(Ordering::Relaxed), 0); assert_eq!(c_check.load(Ordering::Relaxed), 0); @@ -64,7 +64,7 @@ fn multiple() -> Result<()> { assert_eq!(a_check.load(Ordering::Relaxed), 0); assert_eq!(b_check.load(Ordering::Relaxed), 0); assert_eq!(c_check.load(Ordering::Relaxed), 0); - event.call(|delegate| delegate.Invoke(None, 10))?; + event.call(|delegate| delegate.Invoke(None, 10)); assert_eq!(a_check.load(Ordering::Relaxed), 10); assert_eq!(b_check.load(Ordering::Relaxed), 0); assert_eq!(c_check.load(Ordering::Relaxed), 0); @@ -77,7 +77,7 @@ fn multiple() -> Result<()> { assert_eq!(a_check.load(Ordering::Relaxed), 10); assert_eq!(b_check.load(Ordering::Relaxed), 0); assert_eq!(c_check.load(Ordering::Relaxed), 0); - event.call(|delegate| delegate.Invoke(None, 20))?; + event.call(|delegate| delegate.Invoke(None, 20)); assert_eq!(a_check.load(Ordering::Relaxed), 20); assert_eq!(b_check.load(Ordering::Relaxed), 20); assert_eq!(c_check.load(Ordering::Relaxed), 0); @@ -90,40 +90,57 @@ fn multiple() -> Result<()> { assert_eq!(a_check.load(Ordering::Relaxed), 20); assert_eq!(b_check.load(Ordering::Relaxed), 20); assert_eq!(c_check.load(Ordering::Relaxed), 0); - event.call(|delegate| delegate.Invoke(None, 30))?; + event.call(|delegate| delegate.Invoke(None, 30)); assert_eq!(a_check.load(Ordering::Relaxed), 30); assert_eq!(b_check.load(Ordering::Relaxed), 30); assert_eq!(c_check.load(Ordering::Relaxed), 30); - event.remove(a_token)?; + event.remove(a_token); assert_eq!(a_check.load(Ordering::Relaxed), 30); assert_eq!(b_check.load(Ordering::Relaxed), 30); assert_eq!(c_check.load(Ordering::Relaxed), 30); - event.call(|delegate| delegate.Invoke(None, 40))?; + event.call(|delegate| delegate.Invoke(None, 40)); assert_eq!(a_check.load(Ordering::Relaxed), 30); assert_eq!(b_check.load(Ordering::Relaxed), 40); assert_eq!(c_check.load(Ordering::Relaxed), 40); - event.remove(b_token)?; + event.remove(b_token); assert_eq!(a_check.load(Ordering::Relaxed), 30); assert_eq!(b_check.load(Ordering::Relaxed), 40); assert_eq!(c_check.load(Ordering::Relaxed), 40); - event.call(|delegate| delegate.Invoke(None, 50))?; + event.call(|delegate| delegate.Invoke(None, 50)); assert_eq!(a_check.load(Ordering::Relaxed), 30); assert_eq!(b_check.load(Ordering::Relaxed), 40); assert_eq!(c_check.load(Ordering::Relaxed), 50); - event.remove(c_token)?; + event.remove(c_token); assert_eq!(a_check.load(Ordering::Relaxed), 30); assert_eq!(b_check.load(Ordering::Relaxed), 40); assert_eq!(c_check.load(Ordering::Relaxed), 50); - event.call(|delegate| delegate.Invoke(None, 60))?; + event.call(|delegate| delegate.Invoke(None, 60)); assert_eq!(a_check.load(Ordering::Relaxed), 30); assert_eq!(b_check.load(Ordering::Relaxed), 40); assert_eq!(c_check.load(Ordering::Relaxed), 50); Ok(()) } + +#[test] +fn is_send_sync() -> Result<()> { + // test that the event can be sent and synced between threads + let event = Arc::new(Event::>::new()); + let event_sender = event.clone(); + + let thread = std::thread::spawn(move || { + // Nothing will happen because the event is empty. + event_sender.call(|delegate| delegate.Invoke(None, 123)); + event_sender + }); + + let returned_event = thread.join().unwrap(); + assert!(Arc::ptr_eq(&event, &returned_event)); + Ok(()) +}