Skip to content

Commit

Permalink
Improved Event implementation (#3198)
Browse files Browse the repository at this point in the history
  • Loading branch information
lifers authored Aug 16, 2024
1 parent e2a9ef4 commit f19edde
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 247 deletions.
291 changes: 60 additions & 231 deletions crates/libs/core/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use super::*;
use core::ffi::c_void;
use core::marker::PhantomData;
use core::mem::{size_of, transmute_copy};
use core::ptr::null_mut;
use std::sync::Mutex;
use core::{iter::once, mem::transmute_copy};
use std::sync::{Arc, RwLock};

/// A type that you can use to declare and implement an event of a specified delegate type.
///
/// The implementation is thread-safe and designed to avoid contention between events being
/// raised and delegates being added or removed.
pub struct Event<T: Interface> {
swap: Mutex<()>,
change: Mutex<()>,
delegates: Array<T>,
delegates: RwLock<Option<Arc<[Delegate<T>]>>>,
}

unsafe impl<T: Interface> Send for Event<T> {}
unsafe impl<T: Interface> Sync for Event<T> {}

impl<T: Interface> Default for Event<T> {
fn default() -> Self {
Self::new()
Expand All @@ -25,228 +23,86 @@ impl<T: Interface> Event<T> {
/// Creates a new, empty `Event<T>`.
pub fn new() -> Self {
Self {
delegates: Array::new(),
swap: Mutex::default(),
change: Mutex::default(),
delegates: RwLock::new(None),
}
}

/// Registers a delegate with the event object.
pub fn add(&mut self, delegate: &T) -> Result<i64> {
let mut _lock_free_drop = Array::new();
Ok({
let _change_lock = self.change.lock().unwrap();
let mut new_delegates = Array::with_capacity(self.delegates.len() + 1)?;
for delegate in self.delegates.as_slice() {
new_delegates.push(delegate.clone());
}
let delegate = Delegate::new(delegate)?;
let token = delegate.to_token();
new_delegates.push(delegate);
pub fn add(&self, delegate: &T) -> Result<i64> {
let new_delegate = Delegate::new(delegate)?;
let token = new_delegate.to_token();
let new_iter = once(new_delegate);
let mut guard = self.delegates.write().unwrap();

let new_list = if let Some(old_delegates) = guard.as_ref() {
Arc::from_iter(old_delegates.iter().cloned().chain(new_iter))
} else {
Arc::from_iter(new_iter)
};

let old_list = guard.replace(new_list);
drop(guard);
drop(old_list); // drop the old delegates _after_ releasing lock

let _swap_lock = self.swap.lock().unwrap();
_lock_free_drop = self.delegates.swap(new_delegates);
token
})
Ok(token)
}

/// Revokes a delegate's registration from the event object.
pub fn remove(&mut self, token: i64) -> Result<()> {
let mut _lock_free_drop = Array::new();
{
let _change_lock = self.change.lock().unwrap();
if self.delegates.is_empty() {
return Ok(());
}
let mut capacity = self.delegates.len() - 1;
let mut new_delegates = Array::new();
let mut removed = false;
if capacity == 0 {
removed = self.delegates.as_slice()[0].to_token() == token;
} else {
new_delegates = Array::with_capacity(capacity)?;
for delegate in self.delegates.as_slice() {
if !removed && delegate.to_token() == token {
removed = true;
continue;
}
if capacity == 0 {
break;
}
new_delegates.push(delegate.clone());
capacity -= 1;
}
}
if removed {
let _swap_lock = self.swap.lock().unwrap();
_lock_free_drop = self.delegates.swap(new_delegates);
pub fn remove(&self, token: i64) {
let mut guard = self.delegates.write().unwrap();
let mut old_list = None;
if let Some(old_delegates) = guard.as_ref() {
// `self.delegates` is only modified if the token is found.
if let Some(i) = old_delegates
.iter()
.position(|old_delegate| old_delegate.to_token() == token)
{
let new_list = Arc::from_iter(
old_delegates[..i]
.iter()
.chain(old_delegates[i + 1..].iter())
.cloned(),
);

old_list = guard.replace(new_list);
}
}
Ok(())
drop(guard);
drop(old_list); // drop the old delegates _after_ releasing lock
}

/// Clears the event, removing all delegates.
pub fn clear(&mut self) {
let mut _lock_free_drop = Array::new();
{
let _change_lock = self.change.lock().unwrap();
if self.delegates.is_empty() {
return;
}
let _swap_lock = self.swap.lock().unwrap();
_lock_free_drop = self.delegates.swap(Array::new());
}
pub fn clear(&self) {
let mut guard = self.delegates.write().unwrap();
let old_list = guard.take();
drop(guard);
drop(old_list); // drop the old delegates _after_ releasing lock
}

/// Invokes all of the event object's registered delegates with the provided callback.
pub fn call<F: FnMut(&T) -> Result<()>>(&mut self, mut callback: F) -> Result<()> {
let lock_free_calls = {
let _swap_lock = self.swap.lock().unwrap();
self.delegates.clone()
pub fn call<F: FnMut(&T) -> Result<()>>(&self, mut callback: F) {
let delegates = {
let guard = self.delegates.read().unwrap();
if let Some(delegates) = guard.as_ref() {
delegates.clone()
} else {
// No delegates to call.
return;
}
// <-- lock is released here
};
for delegate in lock_free_calls.as_slice() {

for delegate in delegates.iter() {
if let Err(error) = delegate.call(&mut callback) {
const RPC_E_SERVER_UNAVAILABLE: HRESULT = HRESULT(-2147023174); // HRESULT_FROM_WIN32(RPC_S_SERVER_UNAVAILABLE)
if matches!(
error.code(),
imp::RPC_E_DISCONNECTED | imp::JSCRIPT_E_CANTEXECUTE | RPC_E_SERVER_UNAVAILABLE
) {
self.remove(delegate.to_token())?;
self.remove(delegate.to_token());
}
}
}
Ok(())
}
}

/// A thread-safe reference-counted array of delegates.
struct Array<T: Interface> {
buffer: *mut Buffer<T>,
len: usize,
_phantom: PhantomData<T>,
}

impl<T: Interface> Default for Array<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: Interface> Array<T> {
/// Creates a new, empty `Array<T>` with no capacity.
fn new() -> Self {
Self {
buffer: null_mut(),
len: 0,
_phantom: PhantomData,
}
}

/// Creates a new, empty `Array<T>` with the specified capacity.
fn with_capacity(capacity: usize) -> Result<Self> {
Ok(Self {
buffer: Buffer::new(capacity)?,
len: 0,
_phantom: PhantomData,
})
}

/// Swaps the contents of two `Array<T>` objects.
fn swap(&mut self, mut other: Self) -> Self {
unsafe { core::ptr::swap(&mut self.buffer, &mut other.buffer) };
core::mem::swap(&mut self.len, &mut other.len);
other
}

/// Returns `true` if the array contains no delegates.
fn is_empty(&self) -> bool {
self.len == 0
}

/// Returns the number of delegates in the array.
fn len(&self) -> usize {
self.len
}

/// Appends a delegate to the back of the array.
fn push(&mut self, delegate: Delegate<T>) {
unsafe {
(*self.buffer).as_mut_ptr().add(self.len).write(delegate);
self.len += 1;
}
}

/// Returns a slice containing of all delegates.
fn as_slice(&self) -> &[Delegate<T>] {
if self.is_empty() {
&[]
} else {
unsafe { core::slice::from_raw_parts((*self.buffer).as_ptr(), self.len) }
}
}

/// Returns a mutable slice of all delegates.
fn as_mut_slice(&mut self) -> &mut [Delegate<T>] {
if self.is_empty() {
&mut []
} else {
unsafe { core::slice::from_raw_parts_mut((*self.buffer).as_mut_ptr(), self.len) }
}
}
}

impl<T: Interface> Clone for Array<T> {
fn clone(&self) -> Self {
if !self.is_empty() {
unsafe { (*self.buffer).0.add_ref() };
}
Self {
buffer: self.buffer,
len: self.len,
_phantom: PhantomData,
}
}
}

impl<T: Interface> Drop for Array<T> {
fn drop(&mut self) {
unsafe {
if !self.is_empty() && (*self.buffer).0.release() == 0 {
core::ptr::drop_in_place(self.as_mut_slice());
heap_free(self.buffer as _)
}
}
}
}

/// A reference-counted buffer.
#[repr(C)]
#[repr(align(8))]
struct Buffer<T>(imp::RefCount, PhantomData<T>);

impl<T: Interface> Buffer<T> {
/// Creates a new `Buffer` with the specified size in bytes.
fn new(len: usize) -> Result<*mut Self> {
if len == 0 {
Ok(null_mut())
} else {
let alloc_size = size_of::<Self>() + len * size_of::<Delegate<T>>();
let header = heap_alloc(alloc_size)? as *mut Self;
unsafe {
header.write(Self(imp::RefCount::new(1), PhantomData));
}
Ok(header)
}
}

/// Returns a raw pointer to the buffer's contents. The resulting pointer might be uninititalized.
fn as_ptr(&self) -> *const Delegate<T> {
unsafe { (self as *const Self).add(1) as *const _ }
}

/// Returns a raw mutable pointer to the buffer's contents. The resulting pointer might be uninititalized.
fn as_mut_ptr(&mut self) -> *mut Delegate<T> {
unsafe { (self as *mut Self).add(1) as *mut _ }
}
}

Expand Down Expand Up @@ -286,30 +142,3 @@ impl<T: Interface> Delegate<T> {
}
}
}

/// Allocate memory of size `bytes` using `malloc` - the `Event` implementation does not
/// need to use any particular allocator so `HeapAlloc` need not be used.
fn heap_alloc(bytes: usize) -> crate::Result<*mut c_void> {
let ptr: *mut c_void = unsafe {
extern "C" {
fn malloc(bytes: usize) -> *mut c_void;
}

malloc(bytes)
};

if ptr.is_null() {
Err(Error::from_hresult(imp::E_OUTOFMEMORY))
} else {
Ok(ptr)
}
}

/// Free memory allocated by `heap_alloc`.
unsafe fn heap_free(ptr: *mut c_void) {
extern "C" {
fn free(ptr: *mut c_void);
}

free(ptr);
}
Loading

0 comments on commit f19edde

Please sign in to comment.