Skip to content

Commit

Permalink
Update homeworks for next semenster.
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-Janggun committed Feb 25, 2024
1 parent c2c0d50 commit 1829620
Show file tree
Hide file tree
Showing 33 changed files with 547 additions and 797 deletions.
13 changes: 9 additions & 4 deletions homework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ version = "0.1.0"
authors = ["Jeehoon Kang <[email protected]>"]
edition = "2021"

[[bin]]
name = "hello_server"
path = "src/bin/hello_server.rs"
required-features = ["build-bin"]

[features]
build-bin = ["ctrlc"]
check-loom = ["loom"]

[dependencies]
cfg-if = "1.0.0"
crossbeam-channel = "0.5.8"
crossbeam-epoch = "0.9.15"
crossbeam-utils = "0.8.16"
ctrlc = "3.4.1"
crossbeam-channel = "0.5.10"
crossbeam-epoch = "0.9.17"
ctrlc = { version = "3.4.2", optional = true }
cs431 = { git = "https://github.com/kaist-cp/cs431" }
# cs431 = { path = "../cs431" }
loom = { version = "0.7.1", optional = true }
Expand Down
10 changes: 5 additions & 5 deletions homework/doc/list_set.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@ Suppose you want a set data structure that supports concurrent operations.
The simplest possible approach would be taking a non-concurrent set implementation and protecting it with a global lock.
However, this is not a great idea if the set is accessed frequently because a thread's operation blocks all the other threads' operations.

In this homework, you will write two implementations of the set data structures based on singly linked list protected by fine-grained locks.
In this homework, you will write two implementations of the set data structure based on singly linked list protected by fine-grained locks.
* The nodes in the list are sorted by their value, so that one can efficiently check if a value is in the set.
* Each node has its own lock that protects its `next` field.
When traversing the list, the locks are acquired and released in the hand-over-hand manner.
This allows multiple operations run more concurrently.

You will implement two variants.
* In `list_set/fine_grained.rs`, the lock is the usual `Mutex`.
* In `list_set/optimistic_fine_grained.rs`, the lock is `SeqLock`.
* In `list_set/optimistic_fine_grained.rs`, the lock is a `SeqLock`.
This allows read operations to run optimistically without actually locking.
Therefore, read operations are more efficient in read-most scenario, and
they do not block other operations.
However, you need to take more care to get it correct.
However, more care must be taken to ensure correctness.
* You need to validate read operations and handle the failure.
* Do not use `ReadGuard::restart()`.
Using this correctly requires some extra synchronization
(to be covered in lock-free list lecture),
which makes `SeqLock` somewhat pointless.
The tests assume that `ReadGuard::restart()` is not used.
* Since each node can be read and modified to concurrently,
you should use atomic operations to avoid data race.
you should use atomic operations to avoid data races.
Specifically, you will use `crossbeam_epoch`'s `Atomic<T>` type
(instead of `std::sync::AtomicPtr<T>`, due to the next issue).
For `Ordering`, use `SeqCst` everywhere.
Expand Down Expand Up @@ -71,4 +71,4 @@ cd cs431/homework
ls ./target/hw-list_set.zip
```

Submit `list_set.zip` to gg.
Submit `hw-list_set.zip` to gg.
6 changes: 2 additions & 4 deletions homework/scripts/grade-utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ export -f grep_skip_comment
run_linters() {
cargo fmt -- --check
local FMT_ERR=$?
# FIXME: `clippy::needless_pass_by_ref_mut` has false positives that does not go well with skeleton code.
# cargo clippy -- -D warnings
cargo +nightly clippy -- -D warnings -A clippy::needless_pass_by_ref_mut
cargo +nightly clippy -- -D warnings
local CLIPPY_ERR=$?
[ "$FMT_ERR" -ne 0 ] && echo_err 'Please format your code with `cargo fmt` first.'
[ "$CLIPPY_ERR" -ne 0 ] && echo_err 'Please fix the issues from `cargo clippy` first.'
[ "$CLIPPY_ERR" -ne 0 ] && echo_err 'Please fix the issues from `cargo +nightly clippy -- -D warnings` first.'
return $(( FMT_ERR || CLIPPY_ERR ))
}
export -f run_linters
Expand Down
80 changes: 2 additions & 78 deletions homework/src/adt.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,12 @@
use core::marker::PhantomData;
use crossbeam_epoch::Guard;
use cs431::lock::{Lock, RawLock};
use rand::{distributions::Alphanumeric, rngs::ThreadRng, Rng};

/// Trait for a sequential key-value map.
pub trait SequentialMap<K: ?Sized, V> {
/// Lookups a key.
fn lookup<'a>(&'a self, key: &'a K) -> Option<&'a V>;

/// Inserts a key-value pair.
// fn insert<'a>(&'a mut self, key: &'a K, value: V) -> Result<&'a mut V, (&'a mut V, V)>;
fn insert<'a>(&'a mut self, key: &'a K, value: V) -> Result<(), V>;

/// Deletes a key, returning the value.
fn delete(&mut self, key: &K) -> Result<V, ()>;
}

/// Trait for a concurrent key-value map.
pub trait ConcurrentMap<K: ?Sized, V> {
/// Lookups a key.
fn lookup<'a, F, R>(&'a self, key: &'a K, guard: &'a Guard, f: F) -> R
where
F: FnOnce(Option<&V>) -> R;

/// Inserts a key-value pair.
fn insert<'a>(&'a self, key: &'a K, value: V, guard: &'a Guard) -> Result<(), V>;

/// Deletes the given key and returns its value.
fn delete(&self, key: &K, guard: &Guard) -> Result<V, ()>;
}

/// Trait for a nonblocking key-value map.
pub trait NonblockingMap<K: ?Sized, V> {
/// Lookups the given key to get the reference to its value.
fn lookup<'a>(&'a self, key: &K, guard: &'a Guard) -> Option<&'a V>;

/// Inserts a key-value pair.
fn insert(&self, key: &K, value: V, guard: &Guard) -> Result<(), V>;
fn insert(&self, key: K, value: V, guard: &Guard) -> Result<(), V>;

/// Deletes the given key and returns a reference to its value.
///
Expand All @@ -45,53 +15,7 @@ pub trait NonblockingMap<K: ?Sized, V> {
fn delete<'a>(&'a self, key: &K, guard: &'a Guard) -> Result<&'a V, ()>;
}

impl<K: ?Sized, V, L: RawLock, M> ConcurrentMap<K, V> for Lock<L, M>
where
M: SequentialMap<K, V>,
{
fn lookup<'a, F, R>(&'a self, key: &'a K, _guard: &'a Guard, f: F) -> R
where
F: FnOnce(Option<&V>) -> R,
{
f(self.lock().lookup(key))
}

fn insert<'a>(&'a self, key: &'a K, value: V, _guard: &'a Guard) -> Result<(), V> {
self.lock().insert(key, value)
}

fn delete(&self, key: &K, _guard: &Guard) -> Result<V, ()> {
self.lock().delete(key)
}
}

/// Converts nonblocking map into concurrent map
#[derive(Default, Debug)]
pub struct NonblockingConcurrentMap<K: ?Sized, V: Clone, M: NonblockingMap<K, V>> {
inner: M,
_marker: PhantomData<(Box<K>, V)>,
}

impl<K: ?Sized, V: Clone, M: NonblockingMap<K, V>> ConcurrentMap<K, V>
for NonblockingConcurrentMap<K, V, M>
{
fn lookup<'a, F, R>(&'a self, key: &'a K, guard: &'a Guard, f: F) -> R
where
F: FnOnce(Option<&V>) -> R,
{
f(self.inner.lookup(key, guard))
}

fn insert<'a>(&'a self, key: &'a K, value: V, guard: &'a Guard) -> Result<(), V> {
self.inner.insert(key, value, guard)
}

fn delete(&self, key: &K, guard: &Guard) -> Result<V, ()> {
self.inner.delete(key, guard).map(|v| v.clone())
}
}

/// Trait for a concurrent set
/// Trait for a concurrent set.
pub trait ConcurrentSet<T> {
/// Returns `true` iff the set contains the value.
fn contains(&self, value: &T) -> bool;
Expand Down
6 changes: 3 additions & 3 deletions homework/src/bin/hello_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crossbeam_channel::{bounded, unbounded};
use cs431_homework::hello_server::{CancellableTcpListener, Handler, Statistics, ThreadPool};
use std::io;
use std::sync::mpsc::{channel, sync_channel};
use std::sync::Arc;

const ADDR: &str = "localhost:7878";
Expand All @@ -25,10 +25,10 @@ fn main() -> io::Result<()> {
let pool = Arc::new(ThreadPool::new(7));

// The (MPSC) channel of reports between workers and the reporter.
let (report_sender, report_receiver) = unbounded();
let (report_sender, report_receiver) = channel();

// The (SPSC one-shot) channel of stats between the reporter and the main thread.
let (stat_sender, stat_receiver) = bounded(0);
let (stat_sender, stat_receiver) = sync_channel(0);

// Listens to the address.
let listener = Arc::new(CancellableTcpListener::bind(ADDR)?);
Expand Down
14 changes: 8 additions & 6 deletions homework/src/elim_stack/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,10 @@ pub trait Stack<T>: Default {

/// Pushes a value to the stack.
fn push(&self, t: T) {
let mut req = Owned::new(Self::PushReq::from(t));
let mut req = Owned::new(t.into());
let guard = pin();
loop {
match self.try_push(req, &guard) {
Ok(_) => break,
Err(r) => req = r,
}
while let Err(r) = self.try_push(req, &guard) {
req = r;
}
}

Expand All @@ -64,6 +61,11 @@ pub trait Stack<T>: Default {
#[derive(Debug)]
pub struct ElimStack<T, S: Stack<T>> {
pub(crate) inner: S,
// slot tags:
// - 0: no request
// - 1: push request
// - 2: pop request
// - 3: request acknowledged
pub(crate) slots: [Atomic<S::PushReq>; ELIM_SIZE],
_marker: PhantomData<T>,
}
Expand Down
9 changes: 4 additions & 5 deletions homework/src/elim_stack/elim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ impl<T, S: Stack<T>> Stack<T> for ElimStack<T, S> {
req: Owned<Self::PushReq>,
guard: &Guard,
) -> Result<(), Owned<Self::PushReq>> {
let req = match self.inner.try_push(req, guard) {
Ok(()) => return Ok(()),
Err(req) => req,
let Err(req) = self.inner.try_push(req, guard) else {
return Ok(());
};

let index = get_random_elim_index();
let slot_ref = unsafe { self.slots.get_unchecked(index) };
let slot = slot_ref.load(Ordering::Acquire, guard);

unimplemented!()
todo!()
}

fn try_pop(&self, guard: &Guard) -> Result<Option<T>, ()> {
Expand All @@ -36,7 +35,7 @@ impl<T, S: Stack<T>> Stack<T> for ElimStack<T, S> {
let slot_ref = unsafe { self.slots.get_unchecked(index) };
let slot = slot_ref.load(Ordering::Acquire, guard);

unimplemented!()
todo!()
}

fn is_empty(&self, guard: &Guard) -> bool {
Expand Down
4 changes: 3 additions & 1 deletion homework/src/elim_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ mod test {
let stack = ElimStack::default();

scope(|scope| {
let mut handles = Vec::new();
for _ in 0..10 {
let _unused = scope.spawn(|| {
let handle = scope.spawn(|| {
for i in 0..10_000 {
stack.push(i);
assert!(stack.pop().is_some());
}
});
handles.push(handle);
}
});

Expand Down
46 changes: 28 additions & 18 deletions homework/src/elim_stack/treiber_stack.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use core::mem::ManuallyDrop;
use core::mem::{self, ManuallyDrop};
use core::ops::Deref;
use core::ptr;
use core::sync::atomic::Ordering;

use crossbeam_epoch::{unprotected, Atomic, Guard, Owned};
use crossbeam_epoch::{Atomic, Guard, Owned, Shared};

use super::base::Stack;

#[derive(Debug)]
pub struct Node<T> {
data: ManuallyDrop<T>,
next: Atomic<Node<T>>,
next: *const Node<T>,
}

// Any particular `T` should never be accessed concurrently, so no need for `Sync`.
unsafe impl<T: Send> Send for Node<T> {}
unsafe impl<T: Send> Sync for Node<T> {}

/// Treiber's lock-free stack.
///
/// Usable with any number of producers and consumers.
Expand All @@ -25,7 +29,7 @@ impl<T> From<T> for Node<T> {
fn from(t: T) -> Self {
Self {
data: ManuallyDrop::new(t),
next: Atomic::null(),
next: ptr::null(),
}
}
}
Expand Down Expand Up @@ -54,31 +58,34 @@ impl<T> Stack<T> for TreiberStack<T> {
req: Owned<Self::PushReq>,
guard: &Guard,
) -> Result<(), Owned<Self::PushReq>> {
let mut req = req;
let head = self.head.load(Ordering::Relaxed, guard);
req.next.store(head, Ordering::Relaxed);
self.head
req.next = head.as_raw();

match self
.head
.compare_exchange(head, req, Ordering::Release, Ordering::Relaxed, guard)
.map(|_| ())
.map_err(|e| e.new)
{
Ok(_) => Ok(()),
Err(e) => Err(e.new),
}
}

fn try_pop(&self, guard: &Guard) -> Result<Option<T>, ()> {
let head = self.head.load(Ordering::Acquire, guard);
let Some(head_ref) = (unsafe { head.as_ref() }) else {
return Ok(None);
};
let next = head_ref.next.load(Ordering::Relaxed, guard);
let next = Shared::from(head_ref.next);

let _ = self
.head
.compare_exchange(head, next, Ordering::Relaxed, Ordering::Relaxed, guard)
.map_err(|_| ())?;

Ok(Some(unsafe {
let data = ptr::read(&head_ref.data);
guard.defer_destroy(head);
ManuallyDrop::into_inner(data)
}))
let data = ManuallyDrop::into_inner(unsafe { ptr::read(&head_ref.data) });
unsafe { guard.defer_destroy(head) };
Ok(Some(data))
}

fn is_empty(&self, guard: &Guard) -> bool {
Expand All @@ -88,9 +95,10 @@ impl<T> Stack<T> for TreiberStack<T> {

impl<T> Drop for TreiberStack<T> {
fn drop(&mut self) {
unsafe {
let guard = unprotected();
while let Ok(Some(_)) = self.try_pop(guard) {}
let mut o_curr = mem::take(&mut self.head);
while let Some(curr) = unsafe { o_curr.try_into_owned() }.map(Owned::into_box) {
drop(ManuallyDrop::into_inner(curr.data));
o_curr = curr.next.into();
}
}
}
Expand All @@ -105,13 +113,15 @@ mod test {
let stack = TreiberStack::default();

scope(|scope| {
let mut handles = Vec::new();
for _ in 0..10 {
let _unused = scope.spawn(|| {
let handle = scope.spawn(|| {
for i in 0..10_000 {
stack.push(i);
assert!(stack.pop().is_some());
}
});
handles.push(handle);
}
});

Expand Down
Loading

0 comments on commit 1829620

Please sign in to comment.