Skip to content
This repository has been archived by the owner on Nov 5, 2018. It is now read-only.

Commit

Permalink
Add a sharded reader-writer lock (#48)
Browse files Browse the repository at this point in the history
This PR introduces `ShardedLock`, which is a variant of `RwLock` where concurrent read operations don't contend as much.

`ShardedLock` consists of a bunch of smaller `RwLock`s called *shards*.

A read operation read-locks a single shard only. The shard is chosen based on the current thread so that contention is reduced as much as possible.

A write operation has to write-lock every shard in order. That means we trade faster reads for slower writes.

Another way of looking at it is: sharded locking is just poor man's hardware lock elision. While `parking_lot`'s `RwLock` does use HLE on platforms that support it, we have to resort to `ShardedLock` on others.

This PR is basically just a port of [another one](tokio-rs/tokio#517) that added a sharded RW lock to `tokio-reactor` in order to improve the performance of contended reads. I was told that `ShardedLock` would also be useful in [salsa](https://github.com/nikomatsakis/salsa). Let's add it to Crossbeam so that anyone can use it.
  • Loading branch information
Stjepan Glavina authored Oct 1, 2018
1 parent 6547b51 commit 2860dd3
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ rust:
- stable
- beta
- nightly
- 1.25.0
- 1.26.0

script:
- cargo build
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ nightly = []

[dependencies]
cfg-if = "0.1"
lazy_static = "1.1.0"
num_cpus = "1.8.0"
parking_lot = "0.6.4"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Next, add this to your crate:
extern crate crossbeam_utils;
```

The minimum required Rust version is 1.25.
The minimum required Rust version is 1.26.

## License

Expand Down
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,20 @@
extern crate cfg_if;
#[cfg(feature = "use_std")]
extern crate core;
#[cfg(feature = "use_std")]
extern crate parking_lot;
#[cfg(feature = "use_std")]
extern crate num_cpus;
#[cfg(feature = "use_std")]
#[macro_use]
extern crate lazy_static;

mod cache_padded;

pub mod atomic;
#[cfg(feature = "use_std")]
pub mod thread;
#[cfg(feature = "use_std")]
pub mod sync;

pub use cache_padded::CachePadded;
5 changes: 5 additions & 0 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Synchronization primitives.
mod sharded_lock;

pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
147 changes: 147 additions & 0 deletions src/sync/sharded_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//! A scalable reader-writer lock.
//!
//! This implementation makes read operations faster and more scalable due to less contention,
//! while making write operations slower. It also incurs much higher memory overhead than
//! traditional reader-writer locks.
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, DerefMut};

use num_cpus;
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use thread;

use CachePadded;

/// A scalable reader-writer lock.
///
/// This type of lock allows a number of readers or at most one writer at any point in time. The
/// write portion of this lock typically allows modification of the underlying data (exclusive
/// access) and the read portion of this lock typically allows for read-only access (shared
/// access).
///
/// This reader-writer lock differs from typical implementations in that it internally creates a
/// list of reader-writer locks called 'shards'. Shards are aligned and padded to the cache line
/// size.
///
/// Read operations lock only one shard specific to the current thread, while write operations lock
/// every shard in succession. This strategy makes concurrent read operations faster due to less
/// contention, but write operations are slower due to increased amount of locking.
pub struct ShardedLock<T> {
/// A list of locks protecting the internal data.
shards: Vec<CachePadded<RwLock<()>>>,

/// The internal data.
value: UnsafeCell<T>,
}

unsafe impl<T: Send> Send for ShardedLock<T> {}
unsafe impl<T: Send + Sync> Sync for ShardedLock<T> {}

impl<T> ShardedLock<T> {
/// Creates a new `ShardedLock` initialized with `value`.
pub fn new(value: T) -> ShardedLock<T> {
// The number of shards is a power of two so that the modulo operation in `read` becomes a
// simple bitwise "and".
let num_shards = num_cpus::get().next_power_of_two();

ShardedLock {
shards: (0..num_shards)
.map(|_| CachePadded::new(RwLock::new(())))
.collect(),
value: UnsafeCell::new(value),
}
}

/// Locks with shared read access, blocking the current thread until it can be acquired.
///
/// The calling thread will be blocked until there are no more writers which hold the lock.
/// There may be other readers currently inside the lock when this method returns. This method
/// does not provide any guarantees with respect to the ordering of whether contentious readers
/// or writers will acquire the lock first.
///
/// Returns an RAII guard which will drop the read access of this lock when dropped.
pub fn read(&self) -> ShardedLockReadGuard<T> {
// Take the current thread index and map it to a shard index. Thread indices will tend to
// distribute shards among threads equally, thus reducing contention due to read-locking.
let current_index = thread::current_index().unwrap_or(0);
let shard_index = current_index & (self.shards.len() - 1);

ShardedLockReadGuard {
parent: self,
_guard: self.shards[shard_index].read(),
_marker: PhantomData,
}
}

/// Locks with exclusive write access, blocking the current thread until it can be acquired.
///
/// This function will not return while other writers or other readers currently have access to
/// the lock.
///
/// Returns an RAII guard which will drop the write access of this lock when dropped.
pub fn write(&self) -> ShardedLockWriteGuard<T> {
// Write-lock each shard in succession.
for shard in &self.shards {
// The write guard is forgotten, but the lock will be manually unlocked in `drop`.
mem::forget(shard.write());
}

ShardedLockWriteGuard {
parent: self,
_marker: PhantomData,
}
}
}

/// A guard used to release the shared read access of a `ShardedLock` when dropped.
pub struct ShardedLockReadGuard<'a, T: 'a> {
parent: &'a ShardedLock<T>,
_guard: RwLockReadGuard<'a, ()>,
_marker: PhantomData<RwLockReadGuard<'a, T>>,
}

unsafe impl<'a, T: Sync> Sync for ShardedLockReadGuard<'a, T> {}

impl<'a, T> Deref for ShardedLockReadGuard<'a, T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.parent.value.get() }
}
}

/// A guard used to release the exclusive write access of a `ShardedLock` when dropped.
pub struct ShardedLockWriteGuard<'a, T: 'a> {
parent: &'a ShardedLock<T>,
_marker: PhantomData<RwLockWriteGuard<'a, T>>,
}

unsafe impl<'a, T: Sync> Sync for ShardedLockWriteGuard<'a, T> {}

impl<'a, T> Drop for ShardedLockWriteGuard<'a, T> {
fn drop(&mut self) {
// Unlock the shards in reverse order of locking.
for shard in self.parent.shards.iter().rev() {
unsafe {
(**shard).force_unlock_write();
}
}
}
}

impl<'a, T> Deref for ShardedLockWriteGuard<'a, T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.parent.value.get() }
}
}

impl<'a, T> DerefMut for ShardedLockWriteGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.parent.value.get() }
}
}
73 changes: 72 additions & 1 deletion src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::panic;
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::{self, ThreadId};

/// Like [`std::thread::spawn`], but without lifetime bounds on the closure.
///
Expand Down Expand Up @@ -377,3 +378,73 @@ impl<T, F: FnOnce() -> T> FnBox<T> for F {
(*self)()
}
}

/// Returns a `usize` that identifies the current thread.
///
/// Each thread is associated with an 'index'. While there are no particular guarantees, indices
/// usually tend to be consecutive numbers between 0 and the number of running threads.
///
/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
/// tearing down.
#[inline]
pub fn current_index() -> Option<usize> {
REGISTRATION.try_with(|reg| reg.index).ok()
}

/// The global registry keeping track of registered threads and indices.
struct ThreadIndices {
/// Mapping from `ThreadId` to thread index.
mapping: HashMap<ThreadId, usize>,

/// A list of free indices.
free_list: Vec<usize>,

/// The next index to allocate if the free list is empty.
next_index: usize,
}

lazy_static! {
static ref THREAD_INDICES: Mutex<ThreadIndices> = Mutex::new(ThreadIndices {
mapping: HashMap::new(),
free_list: Vec::new(),
next_index: 0,
});
}

/// A registration of a thread with an index.
///
/// When dropped, unregisters the thread and frees the reserved index.
struct Registration {
index: usize,
thread_id: ThreadId,
}

impl Drop for Registration {
fn drop(&mut self) {
let mut indices = THREAD_INDICES.lock().unwrap();
indices.mapping.remove(&self.thread_id);
indices.free_list.push(self.index);
}
}

thread_local! {
static REGISTRATION: Registration = {
let thread_id = thread::current().id();
let mut indices = THREAD_INDICES.lock().unwrap();

let index = match indices.free_list.pop() {
Some(i) => i,
None => {
let i = indices.next_index;
indices.next_index += 1;
i
}
};
indices.mapping.insert(thread_id, index);

Registration {
index,
thread_id,
}
};
}

0 comments on commit 2860dd3

Please sign in to comment.