Skip to content

Commit

Permalink
Replace std library locks with parking_lot (#1136)
Browse files Browse the repository at this point in the history
Replaces all locks with the versions from parking_lot. Since we need
parking_lot for the task queue lock, we might as well use its versions
everywhere.
  • Loading branch information
rinon authored May 28, 2024
2 parents d6a7b3c + b221d9e commit a8ee17e
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 80 deletions.
4 changes: 2 additions & 2 deletions include/dav1d/headers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::src::enum_map::EnumKey;
use atomig::Atomic;
use parking_lot::Mutex;
use std::ffi::c_int;
use std::ffi::c_uint;
use std::fmt;
Expand All @@ -11,7 +12,6 @@ use std::ops::Deref;
use std::ops::Sub;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use strum::EnumCount;
use strum::FromRepr;

Expand Down Expand Up @@ -826,7 +826,7 @@ impl Rav1dITUTT35 {
pub fn to_immut(
mutable: Arc<Mutex<Vec<Rav1dITUTT35>>>,
) -> Arc<DRav1d<Box<[Rav1dITUTT35]>, Box<[Dav1dITUTT35]>>> {
let mutable = Arc::into_inner(mutable).unwrap().into_inner().unwrap();
let mutable = Arc::into_inner(mutable).unwrap().into_inner();
let immutable = mutable.into_boxed_slice();
let rav1d = immutable;
let dav1d = rav1d.iter().cloned().map(Dav1dITUTT35::from).collect();
Expand Down
6 changes: 3 additions & 3 deletions src/cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use crate::src::levels::N_INTRA_PRED_MODES;
use crate::src::levels::N_TX_SIZES;
use crate::src::levels::N_UV_INTRA_PRED_MODES;
use crate::src::tables::dav1d_partition_type_count;
use parking_lot::RwLock;
use parking_lot::RwLockWriteGuard;
use std::cmp;
use std::ffi::c_uint;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::RwLockWriteGuard;
use strum::EnumCount;

#[derive(Clone, Default)]
Expand Down Expand Up @@ -167,7 +167,7 @@ impl CdfThreadContext {
pub fn cdf_write(&self) -> RwLockWriteGuard<CdfContext> {
match self {
Self::QCat(_) => panic!("Expected a Cdf"),
Self::Cdf(cdf) => cdf.cdf.try_write().ok().unwrap(),
Self::Cdf(cdf) => cdf.cdf.try_write().unwrap(),
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4056,14 +4056,14 @@ fn setup_tile(
&mut lf.lr_mask[sb_idx as usize].lr[p][unit_idx as usize]
};

let lr = lr_ref.get_mut().unwrap();
let lr = lr_ref.get_mut();
*lr = Av1RestorationUnit {
filter_v: [3, -7, 15],
filter_h: [3, -7, 15],
sgr_weights: [-32, 31],
..*lr
};
ts.lr_ref.get_mut().unwrap()[p] = *lr;
ts.lr_ref.get_mut()[p] = *lr;
}

if c.tc.len() > 1 {
Expand Down Expand Up @@ -4840,7 +4840,7 @@ fn rav1d_decode_frame_main(c: &Rav1dContext, f: &mut Rav1dFrameData) -> Rav1dRes
let Rav1dContextTaskType::Single(t) = &c.tc[0].task else {
panic!("Expected a single-threaded context");
};
let mut t = t.lock().unwrap();
let mut t = t.lock();

let frame_hdr = &***f.frame_hdr.as_ref().unwrap();

Expand Down Expand Up @@ -4905,7 +4905,7 @@ pub(crate) fn rav1d_decode_frame_exit(
let task_thread = &fc.task_thread;
// We use a blocking lock here because we have rare contention with other
// threads.
let mut f = fc.data.write().unwrap();
let mut f = fc.data.write();
if f.sr_cur.p.data.is_some() {
task_thread.error.store(0, Ordering::Relaxed);
}
Expand Down Expand Up @@ -4969,13 +4969,13 @@ pub(crate) unsafe fn rav1d_decode_frame(c: &Rav1dContext, fc: &Rav1dFrameContext
if c.tc.len() > 1 {
res = rav1d_task_create_tile_sbrow(fc, &f, 0, 1);
drop(f); // release the frame data before waiting for the other threads
let mut task_thread_lock = (*fc.task_thread.ttd).lock.lock().unwrap();
let mut task_thread_lock = (*fc.task_thread.ttd).lock.lock();
(*fc.task_thread.ttd).cond.notify_one();
if res.is_ok() {
while fc.task_thread.done[0].load(Ordering::SeqCst) == 0
|| fc.task_thread.task_counter.load(Ordering::SeqCst) > 0
{
task_thread_lock = fc.task_thread.cond.wait(task_thread_lock).unwrap();
fc.task_thread.cond.wait(&mut task_thread_lock);
}
}
drop(task_thread_lock);
Expand Down Expand Up @@ -5012,13 +5012,13 @@ fn get_upscale_x0(in_w: c_int, out_w: c_int, step: c_int) -> c_int {
pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
// wait for c->out_delayed[next] and move into c->out if visible
let (fc, out, _task_thread_lock) = if c.fc.len() > 1 {
let mut task_thread_lock = c.task_thread.lock.lock().unwrap();
let mut task_thread_lock = c.task_thread.lock.lock();
let next = c.frame_thread.next;
c.frame_thread.next = (c.frame_thread.next + 1) % c.fc.len() as u32;

let fc = &c.fc[next as usize];
while !fc.task_thread.finished.load(Ordering::SeqCst) {
task_thread_lock = fc.task_thread.cond.wait(task_thread_lock).unwrap();
fc.task_thread.cond.wait(&mut task_thread_lock);
}
let out_delayed = &mut c.frame_thread.out_delayed[next as usize];
if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0 {
Expand All @@ -5043,7 +5043,7 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
let error = &mut *fc.task_thread.retval.try_lock().unwrap();
if error.is_some() {
c.cached_error = mem::take(&mut *error);
*c.cached_error_props.get_mut().unwrap() = out_delayed.p.m.clone();
*c.cached_error_props.get_mut() = out_delayed.p.m.clone();
let _ = mem::take(out_delayed);
} else if out_delayed.p.data.is_some() {
let progress = out_delayed.progress.as_ref().unwrap()[1].load(Ordering::Relaxed);
Expand Down Expand Up @@ -5086,7 +5086,7 @@ pub unsafe fn rav1d_submit_frame(c: &mut Rav1dContext) -> Rav1dResult {
let _ = mem::take(&mut f.mvs);
let _ = mem::take(&mut f.seq_hdr);
let _ = mem::take(&mut f.frame_hdr);
*c.cached_error_props.lock().unwrap() = c.in_0.m.clone();
*c.cached_error_props.lock() = c.in_0.m.clone();

f.tiles.clear();
fc.task_thread.finished.store(true, Ordering::SeqCst);
Expand Down
17 changes: 6 additions & 11 deletions src/disjoint_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,12 +753,12 @@ mod release {
#[cfg(debug_assertions)]
mod debug {
use super::*;
use parking_lot::Mutex;
use std::backtrace::Backtrace;
use std::backtrace::BacktraceStatus;
use std::fmt::Debug;
use std::ops::Bound;
use std::panic::Location;
use std::sync::Mutex;
use std::thread;
use std::thread::ThreadId;

Expand Down Expand Up @@ -854,10 +854,10 @@ mod debug {
impl<T: ?Sized + AsMutPtr> DisjointMut<T> {
#[track_caller]
fn add_mut_bounds(&self, current: DisjointMutBounds) {
for existing in self.bounds.immutable.lock().unwrap().iter() {
for existing in self.bounds.immutable.lock().iter() {
current.check_overlaps(existing);
}
let mut mut_bounds = self.bounds.mutable.lock().unwrap();
let mut mut_bounds = self.bounds.mutable.lock();
for existing in mut_bounds.iter() {
current.check_overlaps(existing);
}
Expand All @@ -866,24 +866,19 @@ mod debug {

#[track_caller]
fn add_immut_bounds(&self, current: DisjointMutBounds) {
let mut_bounds = self.bounds.mutable.lock().unwrap();
let mut_bounds = self.bounds.mutable.lock();
for existing in mut_bounds.iter() {
current.check_overlaps(existing);
}
self.bounds.immutable.lock().unwrap().push(current);
self.bounds.immutable.lock().push(current);
}

fn remove_bound(&self, bounds: &DisjointMutBounds) {
let all_bounds = if bounds.mutable {
let mut all_bounds = if bounds.mutable {
self.bounds.mutable.lock()
} else {
self.bounds.immutable.lock()
};
let Ok(mut all_bounds) = all_bounds else {
// Another thread has panicked holding a range lock. We can't
// remove anything.
return;
};
let idx = all_bounds
.iter()
.position(|r| r == bounds)
Expand Down
8 changes: 4 additions & 4 deletions src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ use crate::src::thread_task::Rav1dTasks;
use atomig::Atom;
use atomig::Atomic;
use libc::ptrdiff_t;
use parking_lot::Condvar;
use parking_lot::Mutex;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use std::ffi::c_int;
use std::ffi::c_uint;
use std::mem;
Expand All @@ -102,11 +106,7 @@ use std::sync::atomic::AtomicU8;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::RwLock;
use std::sync::RwLockReadGuard;
use std::thread::JoinHandle;
use zerocopy::AsBytes;
use zerocopy::FromBytes;
Expand Down
2 changes: 1 addition & 1 deletion src/lf_mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use crate::src::levels::TX_4X4;
use crate::src::tables::dav1d_block_dimensions;
use crate::src::tables::dav1d_txfm_dimensions;
use libc::ptrdiff_t;
use parking_lot::RwLock;
use std::cmp;
use std::ffi::c_int;
use std::sync::atomic::AtomicI8;
use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;
use std::sync::RwLock;

#[repr(C)]
pub struct Av1FilterLUT {
Expand Down
28 changes: 14 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use crate::src::picture::Rav1dThreadPicture;
use crate::src::thread_task::rav1d_task_delayed_fg;
use crate::src::thread_task::rav1d_worker_task;
use crate::src::thread_task::FRAME_ERROR;
use parking_lot::Condvar;
use parking_lot::Mutex;
use std::cmp;
use std::ffi::c_char;
use std::ffi::c_uint;
Expand All @@ -61,8 +63,6 @@ use std::sync::atomic::AtomicI32;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::Once;
use std::thread;
use to_method::To as _;
Expand Down Expand Up @@ -253,7 +253,7 @@ pub(crate) unsafe fn rav1d_open(c_out: &mut *mut Rav1dContext, s: &Rav1dSettings
for fc in (*c).fc.iter_mut() {
fc.task_thread.finished = AtomicBool::new(true);
fc.task_thread.ttd = Arc::clone(&(*c).task_thread);
let f = fc.data.get_mut().unwrap();
let f = fc.data.get_mut();
f.lf.last_sharpness = u8::MAX;
}
(*c).tc = (0..n_tc)
Expand Down Expand Up @@ -387,9 +387,9 @@ unsafe fn drain_picture(c: &mut Rav1dContext, out: &mut Rav1dPicture) -> Rav1dRe
for _ in 0..c.fc.len() {
let next = c.frame_thread.next;
let fc = &c.fc[next as usize];
let mut task_thread_lock = c.task_thread.lock.lock().unwrap();
let mut task_thread_lock = c.task_thread.lock.lock();
while !fc.task_thread.finished.load(Ordering::SeqCst) {
task_thread_lock = fc.task_thread.cond.wait(task_thread_lock).unwrap();
fc.task_thread.cond.wait(&mut task_thread_lock);
}
let out_delayed = &mut c.frame_thread.out_delayed[next as usize];
if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0 {
Expand Down Expand Up @@ -418,7 +418,7 @@ unsafe fn drain_picture(c: &mut Rav1dContext, out: &mut Rav1dPicture) -> Rav1dRe
mem::take(&mut *fc.task_thread.retval.try_lock().unwrap())
.err_or(())
.inspect_err(|_| {
*c.cached_error_props.get_mut().unwrap() = out_delayed.p.m.clone();
*c.cached_error_props.get_mut() = out_delayed.p.m.clone();
let _ = mem::take(out_delayed);
})?;
if out_delayed.p.data.is_some() {
Expand Down Expand Up @@ -622,10 +622,10 @@ pub(crate) unsafe fn rav1d_flush(c: &mut Rav1dContext) {
}
c.flush.store(true, Ordering::SeqCst);
if c.tc.len() > 1 {
let mut task_thread_lock = c.task_thread.lock.lock().unwrap();
let mut task_thread_lock = c.task_thread.lock.lock();
for tc in c.tc.iter() {
while !tc.flushed() {
task_thread_lock = tc.thread_data.cond.wait(task_thread_lock).unwrap();
tc.thread_data.cond.wait(&mut task_thread_lock);
}
}
for fc in c.fc.iter_mut() {
Expand Down Expand Up @@ -692,7 +692,7 @@ impl Drop for Rav1dContext {
unsafe {
if self.tc.len() > 1 {
let ttd: &TaskThreadData = &*self.task_thread;
let task_thread_lock = ttd.lock.lock().unwrap();
let task_thread_lock = ttd.lock.lock();
for tc in self.tc.iter() {
tc.thread_data.die.store(true, Ordering::Relaxed);
}
Expand All @@ -707,13 +707,13 @@ impl Drop for Rav1dContext {
}
let fc_len = self.fc.len();
for fc in self.fc.iter_mut() {
let f = fc.data.get_mut().unwrap();
let f = fc.data.get_mut();
if fc_len > 1 {
let _ = mem::take(&mut f.lowest_pixel_mem); // TODO: remove when context is owned
}
mem::take(fc.in_cdf.get_mut().unwrap()); // TODO: remove when context is owned
mem::take(fc.frame_thread_progress.frame.get_mut().unwrap()); // TODO: remove when context is owned
mem::take(fc.frame_thread_progress.copy_lpf.get_mut().unwrap()); // TODO: remove when context is owned
mem::take(fc.in_cdf.get_mut()); // TODO: remove when context is owned
mem::take(fc.frame_thread_progress.frame.get_mut()); // TODO: remove when context is owned
mem::take(fc.frame_thread_progress.copy_lpf.get_mut()); // TODO: remove when context is owned
let _ = mem::take(&mut f.frame_thread); // TODO: remove when context is owned
mem::take(&mut fc.task_thread.tasks); // TODO: remove when context is owned
let _ = mem::take(&mut f.ts); // TODO: remove when context is owned
Expand Down Expand Up @@ -786,7 +786,7 @@ pub unsafe extern "C" fn dav1d_get_decode_error_data_props(
(|| {
validate_input!((!c.is_null(), EINVAL))?;
validate_input!((!out.is_null(), EINVAL))?;
out.write(mem::take(&mut *((*c).cached_error_props).get_mut().unwrap()).into());
out.write(mem::take(&mut *((*c).cached_error_props).get_mut()).into());
Ok(())
})()
.into()
Expand Down
8 changes: 4 additions & 4 deletions src/mem.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Mutex;
use parking_lot::Mutex;

pub struct MemPool<T> {
bufs: Mutex<Vec<Vec<T>>>,
Expand All @@ -12,7 +12,7 @@ impl<T> MemPool<T> {
}

pub fn _pop(&self, size: usize) -> Vec<T> {
if let Some(mut buf) = self.bufs.lock().unwrap().pop() {
if let Some(mut buf) = self.bufs.lock().pop() {
if size > buf.capacity() {
// TODO fallible allocation
buf.reserve(size - buf.len());
Expand All @@ -32,7 +32,7 @@ impl<T> MemPool<T> {
where
T: Copy,
{
if let Some(buf) = self.bufs.lock().unwrap().pop() {
if let Some(buf) = self.bufs.lock().pop() {
if size <= buf.len() {
return buf;
}
Expand All @@ -42,7 +42,7 @@ impl<T> MemPool<T> {
}

pub fn push(&self, buf: Vec<T>) {
self.bufs.lock().unwrap().push(buf);
self.bufs.lock().push(buf);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/obu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2507,14 +2507,14 @@ unsafe fn parse_obus(
);
c.event_flags |= c.refs[frame_hdr.existing_frame_idx as usize].p.flags.into();
} else {
let mut task_thread_lock = c.task_thread.lock.lock().unwrap();
let mut task_thread_lock = c.task_thread.lock.lock();
// Need to append this to the frame output queue.
let next = c.frame_thread.next;
c.frame_thread.next = (c.frame_thread.next + 1) % c.fc.len() as u32;

let fc = &c.fc[next as usize];
while !fc.task_thread.finished.load(Ordering::SeqCst) {
task_thread_lock = fc.task_thread.cond.wait(task_thread_lock).unwrap();
fc.task_thread.cond.wait(&mut task_thread_lock);
}
let out_delayed = &mut c.frame_thread.out_delayed[next as usize];
if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0
Expand All @@ -2540,7 +2540,7 @@ unsafe fn parse_obus(
let error = &mut *fc.task_thread.retval.try_lock().unwrap();
if error.is_some() {
c.cached_error = mem::take(error);
*c.cached_error_props.get_mut().unwrap() = out_delayed.p.m.clone();
*c.cached_error_props.get_mut() = out_delayed.p.m.clone();
let _ = mem::take(out_delayed);
} else if out_delayed.p.data.is_some() {
let progress =
Expand Down Expand Up @@ -2633,7 +2633,7 @@ pub(crate) unsafe fn rav1d_parse_obus(

parse_obus(c, r#in, props, gb)
.inspect_err(|_| {
*c.cached_error_props.get_mut().unwrap() = props.clone();
*c.cached_error_props.get_mut() = props.clone();
writeln!(
c.logger,
"{}",
Expand Down
Loading

0 comments on commit a8ee17e

Please sign in to comment.