Skip to content

Commit

Permalink
Use frame-parallelism when processing a video
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-zero authored and shssoichiro committed Oct 4, 2020
1 parent 3d0d49e commit 987f912
Showing 1 changed file with 64 additions and 31 deletions.
95 changes: 64 additions & 31 deletions av_metrics/src/video/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,38 +153,11 @@ trait VideoMetric: Send + Sync {
}));
}

let mut metrics = Vec::with_capacity(frame_limit.unwrap_or(0));
while frame_limit
.map(|limit| limit > metrics.len())
.unwrap_or(true)
{
if decoder1.get_bit_depth() > 8 {
let frame1 = decoder1.read_video_frame::<u16>();
let frame2 = decoder2.read_video_frame::<u16>();
if let (Ok(frame1), Ok(frame2)) = (frame1, frame2) {
metrics.push(self.process_frame(&frame1, &frame2)?);
} else {
break;
}
} else {
let frame1 = decoder1.read_video_frame::<u8>();
let frame2 = decoder2.read_video_frame::<u8>();
if let (Ok(frame1), Ok(frame2)) = (frame1, frame2) {
metrics.push(self.process_frame(&frame1, &frame2)?);
} else {
break;
}
}
if decoder1.get_bit_depth() > 8 {
self.process_video_mt::<D, u16>(decoder1, decoder2, frame_limit)
} else {
self.process_video_mt::<D, u8>(decoder1, decoder2, frame_limit)
}

if metrics.is_empty() {
return Err(MetricsError::UnsupportedInput {
reason: "No readable frames found in one or more input files",
}
.into());
}

self.aggregate_frame_results(&metrics)
}

fn process_frame<T: Pixel>(
Expand All @@ -197,4 +170,64 @@ trait VideoMetric: Send + Sync {
&self,
metrics: &[Self::FrameResult],
) -> Result<Self::VideoResult, Box<dyn Error>>;

#[cfg(feature = "decode")]
fn process_video_mt<D: Decoder, P: Pixel>(
&mut self,
decoder1: &mut D,
decoder2: &mut D,
frame_limit: Option<usize>,
) -> Result<Self::VideoResult, Box<dyn Error>> {
let num_threads = (rayon::current_num_threads() - 1).max(1);

let mut out = Vec::new();

let (send, recv) = crossbeam::channel::bounded(num_threads);

crossbeam::scope(|s| {
s.spawn(move |_| {
let mut decoded = 0;
while frame_limit.map(|limit| limit > decoded).unwrap_or(true) {
decoded += 1;
let frame1 = decoder1.read_video_frame::<P>();
let frame2 = decoder2.read_video_frame::<P>();
if let (Ok(frame1), Ok(frame2)) = (frame1, frame2) {
send.send((frame1, frame2)).unwrap();
} else {
break;
}
}
});

use rayon::prelude::*;
let mut metrics = Vec::with_capacity(frame_limit.unwrap_or(0));
loop {
let working_set: Vec<_> = (0..num_threads)
.into_par_iter()
.filter_map(|_w| {
recv.recv()
.map(|(f1, f2)| self.process_frame(&f1, &f2).unwrap())
.ok()
})
.collect();
if working_set.is_empty() {
break;
} else {
metrics.extend(working_set);
}
}

out = metrics;
})
.unwrap();

if out.is_empty() {
return Err(MetricsError::UnsupportedInput {
reason: "No readable frames found in one or more input files",
}
.into());
}

self.aggregate_frame_results(&out)
}
}

0 comments on commit 987f912

Please sign in to comment.