Skip to content

Commit

Permalink
Merge branch 'obhq:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
SuchAFuriousDeath authored Apr 6, 2024
2 parents 3765000 + 71f433c commit b13348c
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 75 deletions.
3 changes: 3 additions & 0 deletions src/gmtx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ libc = "0.2"

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.52", features = ["Win32_System_Threading"] }

[target.'cfg(target_os = "macos")'.dependencies]
ulock-sys = "0.1.0"
189 changes: 114 additions & 75 deletions src/gmtx/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
pub use self::guard::*;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::io::Error;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, Weak};
use std::thread::Thread;
use std::sync::Arc;

mod guard;

Expand Down Expand Up @@ -110,15 +109,13 @@ unsafe impl<T: Send> Sync for Gutex<T> {}
pub struct GutexGroup {
owning: ThreadId,
active: UnsafeCell<usize>,
waiting: Mutex<VecDeque<Weak<Thread>>>,
}

impl GutexGroup {
pub fn new() -> Arc<Self> {
Arc::new(Self {
owning: ThreadId::new(0),
active: UnsafeCell::new(0),
waiting: Mutex::new(VecDeque::new()),
})
}

Expand All @@ -130,7 +127,6 @@ impl GutexGroup {
}
}

#[inline(always)]
fn lock(&self) -> GroupGuard<'_> {
// Check if the calling thread already own the lock.
let current = Self::current_thread();
Expand All @@ -140,49 +136,15 @@ impl GutexGroup {
return unsafe { GroupGuard::new(self) };
}

// Try locking without putting the current thread into the wait queue first. This will be
// much faster if the lock can be acquired immediately.
if self
.owning
.compare_exchange(0, current, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
// SAFETY: This is safe because the current thread just acquired the lock successfully
// by the above compare_exchange().
return unsafe { GroupGuard::new(self) };
}

// Split the slow path into a separated function so this function will be likely to be
// inlined.
self.lock_with_wait_queue(current)
}

#[inline(never)]
fn lock_with_wait_queue(&self, current: RawThreadId) -> GroupGuard<'_> {
// Put the current thread into the wait queue. This need to be done before
// compare_exchange() so we don't end up parking the current thread when someone just
// release the lock after compare_exchange().
let waiting = Arc::new(std::thread::current());

self.waiting
.lock()
.unwrap()
.push_back(Arc::downgrade(&waiting));

// Acquire the lock.
while self
.owning
.compare_exchange(0, current, Ordering::Acquire, Ordering::Relaxed)
.is_err()
while let Err(owning) =
self.owning
.compare_exchange(0, current, Ordering::Acquire, Ordering::Relaxed)
{
// Wait for the lock to unlock.
std::thread::park();
unsafe { Self::wait_unlock(self.owning.as_ptr(), owning) };
}

// Remove the current thread from the waiting queue before we construct a GroupGuard because
// the destructor of GroupGuard will unpark one thread, which can be this thread.
drop(waiting);

// SAFETY: This is safe because the current thread acquire the lock successfully by the
// above compare_exchange().
unsafe { GroupGuard::new(self) }
Expand All @@ -204,6 +166,70 @@ impl GutexGroup {
fn current_thread() -> u32 {
unsafe { windows_sys::Win32::System::Threading::GetCurrentThreadId() }
}

#[cfg(target_os = "linux")]
unsafe fn wait_unlock(addr: *mut i32, owning: i32) {
use libc::{syscall, SYS_futex, EAGAIN, FUTEX_PRIVATE_FLAG, FUTEX_WAIT};

if unsafe { syscall(SYS_futex, addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, owning, 0) } < 0 {
let e = Error::last_os_error();

if e.raw_os_error().unwrap() != EAGAIN {
panic!("FUTEX_WAIT failed: {e}");
}
}
}

#[cfg(target_os = "macos")]
unsafe fn wait_unlock(addr: *mut u64, owning: u64) {
use ulock_sys::__ulock_wait;
use ulock_sys::darwin19::UL_COMPARE_AND_WAIT64;

if __ulock_wait(UL_COMPARE_AND_WAIT64, addr.cast(), owning, 0) != 0 {
panic!("__ulock_wait() failed: {}", Error::last_os_error());
}
}

#[cfg(target_os = "windows")]
unsafe fn wait_unlock(addr: *mut u32, owning: u32) {
use windows_sys::Win32::System::Threading::{WaitOnAddress, INFINITE};

if unsafe { WaitOnAddress(addr.cast(), &owning as *const u32 as _, 4, INFINITE) } == 0 {
panic!("WaitOnAddress() failed: {}", Error::last_os_error());
}
}

#[cfg(target_os = "linux")]
unsafe fn wake_one(addr: *mut i32) {
use libc::{syscall, SYS_futex, EAGAIN, FUTEX_PRIVATE_FLAG, FUTEX_WAKE};

if unsafe { syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1) } < 0 {
panic!("FUTEX_WAKE failed: {}", Error::last_os_error());
}
}

#[cfg(target_os = "macos")]
unsafe fn wake_one(addr: *mut u64) {
use libc::ENOENT;
use ulock_sys::__ulock_wake;
use ulock_sys::darwin19::UL_COMPARE_AND_WAIT64;

if __ulock_wake(UL_COMPARE_AND_WAIT64, addr.cast(), 0) != 0 {
// __ulock_wake will return ENOENT if no other threads being waiting on the address.
let e = Error::last_os_error();

if e.raw_os_error().unwrap() != ENOENT {
panic!("__ulock_wake() failed: {e}");
}
}
}

#[cfg(target_os = "windows")]
unsafe fn wake_one(addr: *mut u32) {
use windows_sys::Win32::System::Threading::WakeByAddressSingle;

unsafe { WakeByAddressSingle(addr.cast()) };
}
}

unsafe impl Send for GutexGroup {}
Expand All @@ -220,7 +246,6 @@ impl<'a> GroupGuard<'a> {
/// # Safety
/// The group must be locked by the calling thread with no active references to any of its
/// field.
#[inline(always)]
unsafe fn new(group: &'a GutexGroup) -> Self {
*group.active.get() += 1;

Expand All @@ -247,44 +272,58 @@ impl<'a> Drop for GroupGuard<'a> {
// Release the lock.
self.group.owning.store(0, Ordering::Release);

// Wakeup one waiting thread.
let mut waiting = match self.group.waiting.try_lock() {
Ok(v) => v,
Err(_) => {
// There are 2 possible cases here:
//
// 1. The other thread being doing a wakeup on the bottom.
// 2. The other thread being acquiring the lock using wait queue.
//
// For the first case we don't need to do a wakeup because someone already working
// on this. For the second case that thread (or someone else that are not parking
// yet) can acquire the lock because we just release it on the above.
// That mean there is at least one thread that not yet parking, which imply that one
// of them will be successfully acquire the lock.
return;
}
};

while let Some(t) = waiting.pop_front() {
if let Some(t) = t.upgrade() {
t.unpark();
break;
}
}
unsafe { GutexGroup::wake_one(self.group.owning.as_ptr()) };
}
}

#[cfg(target_os = "linux")]
type ThreadId = std::sync::atomic::AtomicI32;
#[cfg(target_os = "linux")]
type RawThreadId = i32;

#[cfg(target_os = "macos")]
type ThreadId = std::sync::atomic::AtomicU64;
#[cfg(target_os = "macos")]
type RawThreadId = u64;

#[cfg(target_os = "windows")]
type ThreadId = std::sync::atomic::AtomicU32;
#[cfg(target_os = "windows")]
type RawThreadId = u32;

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Barrier;
use std::time::Duration;

#[test]
fn group_lock() {
let b = Arc::new(Barrier::new(2));
let v = Arc::new(GutexGroup::new().spawn(0));
let mut l = v.write();
let t = std::thread::spawn({
let b = b.clone();
let v = v.clone();

move || {
// Wait for parent thread.
let mut l = v.write();

b.wait();

assert_eq!(*l, 1);

// Notify the parent thread.
std::thread::sleep(Duration::from_secs(1));

*l = 2;
}
});

// Notify the inner thread.
*l = 1;
drop(l);

// Wait for the inner thread value.
b.wait();

assert_eq!(*v.read(), 2);

t.join().unwrap();
}
}

0 comments on commit b13348c

Please sign in to comment.