From 71f433c290490e0ac3db69b56a0fe26a3cc1f141 Mon Sep 17 00:00:00 2001 From: Putta Khunchalee Date: Sat, 6 Apr 2024 19:49:11 +0700 Subject: [PATCH] Replaces Gutex thread park/unpark with Futex (#807) --- src/gmtx/Cargo.toml | 3 + src/gmtx/src/lib.rs | 189 ++++++++++++++++++++++++++------------------ 2 files changed, 117 insertions(+), 75 deletions(-) diff --git a/src/gmtx/Cargo.toml b/src/gmtx/Cargo.toml index 5e345b3d4..cf8694e1e 100644 --- a/src/gmtx/Cargo.toml +++ b/src/gmtx/Cargo.toml @@ -8,3 +8,6 @@ libc = "0.2" [target.'cfg(windows)'.dependencies] windows-sys = { version = "0.52", features = ["Win32_System_Threading"] } + +[target.'cfg(target_os = "macos")'.dependencies] +ulock-sys = "0.1.0" diff --git a/src/gmtx/src/lib.rs b/src/gmtx/src/lib.rs index 46a96b70d..fd747e166 100644 --- a/src/gmtx/src/lib.rs +++ b/src/gmtx/src/lib.rs @@ -1,11 +1,10 @@ pub use self::guard::*; use std::cell::UnsafeCell; -use std::collections::VecDeque; +use std::io::Error; use std::marker::PhantomData; use std::rc::Rc; use std::sync::atomic::Ordering; -use std::sync::{Arc, Mutex, Weak}; -use std::thread::Thread; +use std::sync::Arc; mod guard; @@ -110,7 +109,6 @@ unsafe impl Sync for Gutex {} pub struct GutexGroup { owning: ThreadId, active: UnsafeCell, - waiting: Mutex>>, } impl GutexGroup { @@ -118,7 +116,6 @@ impl GutexGroup { Arc::new(Self { owning: ThreadId::new(0), active: UnsafeCell::new(0), - waiting: Mutex::new(VecDeque::new()), }) } @@ -130,7 +127,6 @@ impl GutexGroup { } } - #[inline(always)] fn lock(&self) -> GroupGuard<'_> { // Check if the calling thread already own the lock. let current = Self::current_thread(); @@ -140,49 +136,15 @@ impl GutexGroup { return unsafe { GroupGuard::new(self) }; } - // Try locking without putting the current thread into the wait queue first. This will be - // much faster if the lock can be acquired immediately. - if self - .owning - .compare_exchange(0, current, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - { - // SAFETY: This is safe because the current thread just acquired the lock successfully - // by the above compare_exchange(). - return unsafe { GroupGuard::new(self) }; - } - - // Split the slow path into a separated function so this function will be likely to be - // inlined. - self.lock_with_wait_queue(current) - } - - #[inline(never)] - fn lock_with_wait_queue(&self, current: RawThreadId) -> GroupGuard<'_> { - // Put the current thread into the wait queue. This need to be done before - // compare_exchange() so we don't end up parking the current thread when someone just - // release the lock after compare_exchange(). - let waiting = Arc::new(std::thread::current()); - - self.waiting - .lock() - .unwrap() - .push_back(Arc::downgrade(&waiting)); - // Acquire the lock. - while self - .owning - .compare_exchange(0, current, Ordering::Acquire, Ordering::Relaxed) - .is_err() + while let Err(owning) = + self.owning + .compare_exchange(0, current, Ordering::Acquire, Ordering::Relaxed) { // Wait for the lock to unlock. - std::thread::park(); + unsafe { Self::wait_unlock(self.owning.as_ptr(), owning) }; } - // Remove the current thread from the waiting queue before we construct a GroupGuard because - // the destructor of GroupGuard will unpark one thread, which can be this thread. - drop(waiting); - // SAFETY: This is safe because the current thread acquire the lock successfully by the // above compare_exchange(). unsafe { GroupGuard::new(self) } @@ -204,6 +166,70 @@ impl GutexGroup { fn current_thread() -> u32 { unsafe { windows_sys::Win32::System::Threading::GetCurrentThreadId() } } + + #[cfg(target_os = "linux")] + unsafe fn wait_unlock(addr: *mut i32, owning: i32) { + use libc::{syscall, SYS_futex, EAGAIN, FUTEX_PRIVATE_FLAG, FUTEX_WAIT}; + + if unsafe { syscall(SYS_futex, addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, owning, 0) } < 0 { + let e = Error::last_os_error(); + + if e.raw_os_error().unwrap() != EAGAIN { + panic!("FUTEX_WAIT failed: {e}"); + } + } + } + + #[cfg(target_os = "macos")] + unsafe fn wait_unlock(addr: *mut u64, owning: u64) { + use ulock_sys::__ulock_wait; + use ulock_sys::darwin19::UL_COMPARE_AND_WAIT64; + + if __ulock_wait(UL_COMPARE_AND_WAIT64, addr.cast(), owning, 0) != 0 { + panic!("__ulock_wait() failed: {}", Error::last_os_error()); + } + } + + #[cfg(target_os = "windows")] + unsafe fn wait_unlock(addr: *mut u32, owning: u32) { + use windows_sys::Win32::System::Threading::{WaitOnAddress, INFINITE}; + + if unsafe { WaitOnAddress(addr.cast(), &owning as *const u32 as _, 4, INFINITE) } == 0 { + panic!("WaitOnAddress() failed: {}", Error::last_os_error()); + } + } + + #[cfg(target_os = "linux")] + unsafe fn wake_one(addr: *mut i32) { + use libc::{syscall, SYS_futex, EAGAIN, FUTEX_PRIVATE_FLAG, FUTEX_WAKE}; + + if unsafe { syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1) } < 0 { + panic!("FUTEX_WAKE failed: {}", Error::last_os_error()); + } + } + + #[cfg(target_os = "macos")] + unsafe fn wake_one(addr: *mut u64) { + use libc::ENOENT; + use ulock_sys::__ulock_wake; + use ulock_sys::darwin19::UL_COMPARE_AND_WAIT64; + + if __ulock_wake(UL_COMPARE_AND_WAIT64, addr.cast(), 0) != 0 { + // __ulock_wake will return ENOENT if no other threads being waiting on the address. + let e = Error::last_os_error(); + + if e.raw_os_error().unwrap() != ENOENT { + panic!("__ulock_wake() failed: {e}"); + } + } + } + + #[cfg(target_os = "windows")] + unsafe fn wake_one(addr: *mut u32) { + use windows_sys::Win32::System::Threading::WakeByAddressSingle; + + unsafe { WakeByAddressSingle(addr.cast()) }; + } } unsafe impl Send for GutexGroup {} @@ -220,7 +246,6 @@ impl<'a> GroupGuard<'a> { /// # Safety /// The group must be locked by the calling thread with no active references to any of its /// field. - #[inline(always)] unsafe fn new(group: &'a GutexGroup) -> Self { *group.active.get() += 1; @@ -247,44 +272,58 @@ impl<'a> Drop for GroupGuard<'a> { // Release the lock. self.group.owning.store(0, Ordering::Release); - // Wakeup one waiting thread. - let mut waiting = match self.group.waiting.try_lock() { - Ok(v) => v, - Err(_) => { - // There are 2 possible cases here: - // - // 1. The other thread being doing a wakeup on the bottom. - // 2. The other thread being acquiring the lock using wait queue. - // - // For the first case we don't need to do a wakeup because someone already working - // on this. For the second case that thread (or someone else that are not parking - // yet) can acquire the lock because we just release it on the above. - // That mean there is at least one thread that not yet parking, which imply that one - // of them will be successfully acquire the lock. - return; - } - }; - - while let Some(t) = waiting.pop_front() { - if let Some(t) = t.upgrade() { - t.unpark(); - break; - } - } + unsafe { GutexGroup::wake_one(self.group.owning.as_ptr()) }; } } #[cfg(target_os = "linux")] type ThreadId = std::sync::atomic::AtomicI32; -#[cfg(target_os = "linux")] -type RawThreadId = i32; #[cfg(target_os = "macos")] type ThreadId = std::sync::atomic::AtomicU64; -#[cfg(target_os = "macos")] -type RawThreadId = u64; #[cfg(target_os = "windows")] type ThreadId = std::sync::atomic::AtomicU32; -#[cfg(target_os = "windows")] -type RawThreadId = u32; + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Barrier; + use std::time::Duration; + + #[test] + fn group_lock() { + let b = Arc::new(Barrier::new(2)); + let v = Arc::new(GutexGroup::new().spawn(0)); + let mut l = v.write(); + let t = std::thread::spawn({ + let b = b.clone(); + let v = v.clone(); + + move || { + // Wait for parent thread. + let mut l = v.write(); + + b.wait(); + + assert_eq!(*l, 1); + + // Notify the parent thread. + std::thread::sleep(Duration::from_secs(1)); + + *l = 2; + } + }); + + // Notify the inner thread. + *l = 1; + drop(l); + + // Wait for the inner thread value. + b.wait(); + + assert_eq!(*v.read(), 2); + + t.join().unwrap(); + } +}