Skip to content
This repository has been archived by the owner on Nov 5, 2018. It is now read-only.

Commit

Permalink
Support nested scoped spawns (#47)
Browse files Browse the repository at this point in the history
Closes #46.

cc @pedrocr
  • Loading branch information
Stjepan Glavina authored Oct 25, 2018
1 parent 6d261f5 commit 385bc0c
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 183 deletions.
77 changes: 74 additions & 3 deletions src/sync/sharded_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
//! traditional reader-writer locks.
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::thread::{self, ThreadId};

use num_cpus;
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use thread;
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};

use CachePadded;

Expand Down Expand Up @@ -66,7 +67,7 @@ impl<T> ShardedLock<T> {
pub fn read(&self) -> ShardedLockReadGuard<T> {
// Take the current thread index and map it to a shard index. Thread indices will tend to
// distribute shards among threads equally, thus reducing contention due to read-locking.
let current_index = thread::current_index().unwrap_or(0);
let current_index = current_index().unwrap_or(0);
let shard_index = current_index & (self.shards.len() - 1);

ShardedLockReadGuard {
Expand Down Expand Up @@ -145,3 +146,73 @@ impl<'a, T> DerefMut for ShardedLockWriteGuard<'a, T> {
unsafe { &mut *self.parent.value.get() }
}
}

/// Returns a `usize` that identifies the current thread.
///
/// Each thread is associated with an 'index'. While there are no particular guarantees, indices
/// usually tend to be consecutive numbers between 0 and the number of running threads.
///
/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
/// tearing down.
#[inline]
fn current_index() -> Option<usize> {
REGISTRATION.try_with(|reg| reg.index).ok()
}

/// The global registry keeping track of registered threads and indices.
struct ThreadIndices {
/// Mapping from `ThreadId` to thread index.
mapping: HashMap<ThreadId, usize>,

/// A list of free indices.
free_list: Vec<usize>,

/// The next index to allocate if the free list is empty.
next_index: usize,
}

lazy_static! {
static ref THREAD_INDICES: Mutex<ThreadIndices> = Mutex::new(ThreadIndices {
mapping: HashMap::new(),
free_list: Vec::new(),
next_index: 0,
});
}

/// A registration of a thread with an index.
///
/// When dropped, unregisters the thread and frees the reserved index.
struct Registration {
index: usize,
thread_id: ThreadId,
}

impl Drop for Registration {
fn drop(&mut self) {
let mut indices = THREAD_INDICES.lock();
indices.mapping.remove(&self.thread_id);
indices.free_list.push(self.index);
}
}

thread_local! {
static REGISTRATION: Registration = {
let thread_id = thread::current().id();
let mut indices = THREAD_INDICES.lock();

let index = match indices.free_list.pop() {
Some(i) => i,
None => {
let i = indices.next_index;
indices.next_index += 1;
i
}
};
indices.mapping.insert(thread_id, index);

Registration {
index,
thread_id,
}
};
}
Loading

0 comments on commit 385bc0c

Please sign in to comment.