From eed5047765da8290be4cb3e297b4a5d5482b2d1d Mon Sep 17 00:00:00 2001 From: newideas99 Date: Sun, 13 Oct 2024 19:44:34 -0400 Subject: [PATCH 1/2] updated decoder cache --- crates/rendering/src/decoder.rs | 294 ++++++++++++++------------------ 1 file changed, 128 insertions(+), 166 deletions(-) diff --git a/crates/rendering/src/decoder.rs b/crates/rendering/src/decoder.rs index 3071dcc6..b8047d60 100644 --- a/crates/rendering/src/decoder.rs +++ b/crates/rendering/src/decoder.rs @@ -1,9 +1,10 @@ use std::{ cell::Cell, - collections::BTreeMap, + collections::{BTreeMap, VecDeque}, path::PathBuf, ptr::{null, null_mut}, sync::{mpsc, Arc}, + time::{Duration, Instant}, }; use ffmpeg::{ @@ -18,20 +19,28 @@ use ffmpeg_sys_next::{ AVPixelFormat, AV_CODEC_HW_CONFIG_METHOD_HW_DEVICE_CTX, }; +const FRAME_CACHE_SIZE: usize = 50; +const MAX_CACHE_MEMORY: usize = 1024 * 1024 * 1024; // 1 GB max cache size +const CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60); // Clean up every 60 seconds + pub type DecodedFrame = Arc>; enum VideoDecoderMessage { - GetFrame(u32, tokio::sync::oneshot::Sender>>>), + GetFrame(u32, tokio::sync::oneshot::Sender>), +} + +#[derive(Clone)] +struct CachedFrame { + frame_number: u32, + frame: DecodedFrame, + last_accessed: Instant, } fn ts_to_frame(ts: i64, time_base: Rational, frame_rate: Rational) -> u32 { - // dbg!((ts, time_base, frame_rate)); ((ts * time_base.numerator() as i64 * frame_rate.numerator() as i64) / (time_base.denominator() as i64 * frame_rate.denominator() as i64)) as u32 } -const FRAME_CACHE_SIZE: usize = 50; - pub struct AsyncVideoDecoder; impl AsyncVideoDecoder { @@ -40,7 +49,6 @@ impl AsyncVideoDecoder { std::thread::spawn(move || { let mut input = ffmpeg::format::input(&path).unwrap(); - let input_stream = input .streams() .best(ffmpeg::media::Type::Video) @@ -72,206 +80,140 @@ impl AsyncVideoDecoder { let time_base = input_stream.time_base(); let frame_rate = input_stream.rate(); - // Create a decoder for the video stream let mut decoder = context.decoder().video().unwrap(); - use ffmpeg::format::Pixel; - use ffmpeg::software::scaling::{context::Context, flag::Flags}; - let mut scaler_input_format = hw_device .as_ref() .map(|d| d.pix_fmt) .unwrap_or(decoder.format()); - let mut scaler = Context::get( + let mut scaler = ffmpeg::software::scaling::context::Context::get( scaler_input_format, decoder.width(), decoder.height(), Pixel::RGBA, decoder.width(), decoder.height(), - Flags::BILINEAR, + ffmpeg::software::scaling::flag::Flags::BILINEAR, ) .unwrap(); let mut temp_frame = ffmpeg::frame::Video::empty(); - - let render_more_margin = (FRAME_CACHE_SIZE / 4) as u32; - - let mut cache = BTreeMap::>>::new(); - // active frame is a frame that triggered decode. - // frames that are within render_more_margin of this frame won't trigger decode. - let mut last_active_frame = None::; - + let mut cache = VecDeque::new(); + let mut cache_size = 0; + let mut last_cleanup = Instant::now(); let mut last_decoded_frame = None::; - struct PacketStuff<'a> { - packets: PacketIter<'a>, - skipped_packet: Option<(Stream<'a>, Packet)>, - } - let mut peekable_requests = PeekableReceiver { rx, peeked: None }; - let mut packets = input.packets(); - // let mut packet_stuff = PacketStuff { - // packets: input.packets(), - // skipped_packet: None, - // }; while let Ok(r) = peekable_requests.recv() { match r { VideoDecoderMessage::GetFrame(frame_number, sender) => { - // println!("retrieving frame {frame_number}"); + let mut frame_to_send: Option = None; - let mut sender = if let Some(cached) = cache.get(&frame_number) { - // println!("sending frame {frame_number} from cache"); - sender.send(Some(cached.clone())).ok(); - continue; + if let Some(index) = cache.iter().position(|f: &CachedFrame| f.frame_number == frame_number) { + let mut cached = cache.remove(index).unwrap(); + cached.last_accessed = Instant::now(); + cache.push_front(cached.clone()); + frame_to_send = Some(cached.frame.clone()); } else { - Some(sender) - }; - - let cache_min = frame_number.saturating_sub(FRAME_CACHE_SIZE as u32 / 2); - let cache_max = frame_number + FRAME_CACHE_SIZE as u32 / 2; - - if frame_number <= 0 - || last_decoded_frame - .map(|f| { - frame_number < f || - // seek forward for big jumps. this threshold is arbitrary but should be derived from i-frames in future - frame_number - f > FRAME_CACHE_SIZE as u32 - }) - .unwrap_or(true) - { - let timestamp_us = - ((frame_number as f32 / frame_rate.numerator() as f32) + if frame_number <= 0 + || last_decoded_frame + .map(|f| frame_number < f || frame_number - f > FRAME_CACHE_SIZE as u32) + .unwrap_or(true) + { + let timestamp_us = ((frame_number as f32 / frame_rate.numerator() as f32) * 1_000_000.0) as i64; - let position = timestamp_us.rescale((1, 1_000_000), rescale::TIME_BASE); - - println!("seeking to {position} for frame {frame_number}"); - - decoder.flush(); - input.seek(position, ..position).unwrap(); - cache.clear(); - last_decoded_frame = None; - - packets = input.packets(); - } - - last_active_frame = Some(frame_number); - - loop { - if peekable_requests.peek().is_some() { - break; + let position = timestamp_us.rescale((1, 1_000_000), rescale::TIME_BASE); + + decoder.flush(); + input.seek(position, ..position).unwrap(); + cache.clear(); + cache_size = 0; + last_decoded_frame = None; + packets = input.packets(); } - let Some((stream, packet)) = packets.next() else { - break; - }; - - if stream.index() == input_stream_index { - let packet_frame = - ts_to_frame(packet.pts().unwrap(), time_base, frame_rate); - // println!("sending frame {packet_frame} packet"); - - decoder.send_packet(&packet).ok(); // decode failures are ok, we just fail to return a frame - - let mut exit = false; - - while decoder.receive_frame(&mut temp_frame).is_ok() { - let current_frame = ts_to_frame( - temp_frame.pts().unwrap(), - time_base, - frame_rate, - ); - // println!("processing frame {current_frame}"); - last_decoded_frame = Some(current_frame); - - let exceeds_cache_bounds = current_frame > cache_max; - let too_small_for_cache_bounds = current_frame < cache_min; - - let hw_frame = - hw_device.as_ref().and_then(|d| d.get_hwframe(&temp_frame)); - - let frame = hw_frame.as_ref().unwrap_or(&temp_frame); - - if frame.format() != scaler_input_format { - // Reinitialize the scaler with the new input format - scaler_input_format = frame.format(); - scaler = Context::get( - scaler_input_format, - decoder.width(), - decoder.height(), - Pixel::RGBA, - decoder.width(), - decoder.height(), - Flags::BILINEAR, - ) - .unwrap(); - } - - let mut rgb_frame = frame::Video::empty(); - scaler.run(frame, &mut rgb_frame).unwrap(); - - let width = rgb_frame.width() as usize; - let height = rgb_frame.height() as usize; - let stride = rgb_frame.stride(0); - let data = rgb_frame.data(0); - let expected_size = width * height * 4; - - let mut frame_buffer = Vec::with_capacity(expected_size); + 'packet_loop: loop { + if peekable_requests.peek().is_some() { + break; + } + let Some((stream, packet)) = packets.next() else { + break; + }; + + if stream.index() == input_stream_index { + decoder.send_packet(&packet).ok(); + + while decoder.receive_frame(&mut temp_frame).is_ok() { + let current_frame = ts_to_frame( + temp_frame.pts().unwrap(), + time_base, + frame_rate, + ); + last_decoded_frame = Some(current_frame); + + let hw_frame = hw_device.as_ref().and_then(|d| d.get_hwframe(&temp_frame)); + let frame = hw_frame.as_ref().unwrap_or(&temp_frame); + + if frame.format() != scaler_input_format { + scaler_input_format = frame.format(); + scaler = ffmpeg::software::scaling::context::Context::get( + scaler_input_format, + decoder.width(), + decoder.height(), + Pixel::RGBA, + decoder.width(), + decoder.height(), + ffmpeg::software::scaling::flag::Flags::BILINEAR, + ) + .unwrap(); + } - // account for stride > width - for line_data in data.chunks_exact(stride) { - frame_buffer.extend_from_slice(&line_data[0..width * 4]); - } + let mut rgb_frame = frame::Video::empty(); + scaler.run(frame, &mut rgb_frame).unwrap(); - let frame = Arc::new(frame_buffer); + let width = rgb_frame.width() as usize; + let height = rgb_frame.height() as usize; + let stride = rgb_frame.stride(0); + let data = rgb_frame.data(0); - if current_frame == frame_number { - if let Some(sender) = sender.take() { - sender.send(Some(frame.clone())).ok(); + let mut frame_buffer = Vec::with_capacity(width * height * 4); + for line_data in data.chunks_exact(stride) { + frame_buffer.extend_from_slice(&line_data[0..width * 4]); } - } - if !too_small_for_cache_bounds { - if cache.len() >= FRAME_CACHE_SIZE { - if let Some(last_active_frame) = &last_active_frame { - let frame = if frame_number > *last_active_frame { - *cache.keys().next().unwrap() - } else if frame_number < *last_active_frame { - *cache.keys().next_back().unwrap() - } else { - let min = *cache.keys().min().unwrap(); - let max = *cache.keys().max().unwrap(); - - if current_frame > max { - min - } else { - max - } - }; - - cache.remove(&frame); - } else { - cache.clear() - } + let frame_size = frame_buffer.len(); + let new_frame = Arc::new(frame_buffer); + + if current_frame == frame_number && frame_to_send.is_none() { + frame_to_send = Some(new_frame.clone()); } - cache.insert(current_frame, frame); - } + cache.push_front(CachedFrame { + frame_number: current_frame, + frame: new_frame, + last_accessed: Instant::now(), + }); + cache_size += frame_size; - exit = exit || exceeds_cache_bounds; - } + Self::cleanup_cache(&mut cache, &mut cache_size); - if exit { - break; + if frame_to_send.is_some() { + break 'packet_loop; + } + } } } } - if sender.is_some() { - println!("failed to send frame {frame_number}"); + // Send the frame outside of all loops + sender.send(frame_to_send).ok(); + + if last_cleanup.elapsed() > CACHE_CLEANUP_INTERVAL { + Self::aggressive_cleanup(&mut cache, &mut cache_size); + last_cleanup = Instant::now(); } } } @@ -280,6 +222,27 @@ impl AsyncVideoDecoder { AsyncVideoDecoderHandle { sender: tx } } + + fn cleanup_cache(cache: &mut VecDeque, cache_size: &mut usize) { + while *cache_size > MAX_CACHE_MEMORY || cache.len() > FRAME_CACHE_SIZE { + if let Some(old_frame) = cache.pop_back() { + *cache_size -= old_frame.frame.len(); + } else { + break; + } + } + } + + fn aggressive_cleanup(cache: &mut VecDeque, cache_size: &mut usize) { + let now = Instant::now(); + cache.retain(|frame| { + let keep = now.duration_since(frame.last_accessed) < Duration::from_secs(300); // 5 minutes + if !keep { + *cache_size -= frame.frame.len(); + } + keep + }); + } } #[derive(Clone)] @@ -288,7 +251,7 @@ pub struct AsyncVideoDecoderHandle { } impl AsyncVideoDecoderHandle { - pub async fn get_frame(&self, frame_number: u32) -> Option>> { + pub async fn get_frame(&self, frame_number: u32) -> Option { let (tx, rx) = tokio::sync::oneshot::channel(); self.sender .send(VideoDecoderMessage::GetFrame(frame_number, tx)) @@ -318,7 +281,6 @@ impl PeekableReceiver { } fn try_recv(&mut self) -> Result { - println!("try_recv"); if let Some(value) = self.peeked.take() { Ok(value) } else { @@ -472,4 +434,4 @@ impl CodecContextExt for codec::context::Context { }) } } -} +} \ No newline at end of file From 6a7e0404fa872bf357c5dac121858ff261d425f5 Mon Sep 17 00:00:00 2001 From: newideas99 Date: Sun, 13 Oct 2024 20:36:17 -0400 Subject: [PATCH 2/2] Switched to bounded channels --- Cargo.lock | 1 + apps/desktop/src-tauri/src/lib.rs | 5 +- crates/rendering/Cargo.toml | 1 + crates/rendering/src/lib.rs | 156 +++++++++++++++++------------- 4 files changed, 96 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68e42f6f..0d654f53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -728,6 +728,7 @@ dependencies = [ "futures-intrusive", "lru", "nix 0.29.0", + "num_cpus", "serde", "serde_json", "specta", diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index 4b70705f..26d9b0c0 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -53,6 +53,7 @@ use tauri_plugin_notification::PermissionState; use tauri_plugin_shell::ShellExt; use tauri_specta::Event; use tokio::sync::watch; +use tokio::sync::mpsc; use tokio::task; use tokio::{ sync::{Mutex, RwLock}, @@ -730,7 +731,9 @@ async fn render_to_file_impl( let decoders = editor_instance.decoders.clone(); let options = editor_instance.render_constants.options.clone(); - let (tx_image_data, mut rx_image_data) = tokio::sync::mpsc::unbounded_channel::>(); + // Change this to be whatever is the most ideal for certain processors + let buffer_size = 60 * 3; // 3 seconds at 60 fps, or 6 seconds at 30 fps + let (tx_image_data, mut rx_image_data) = mpsc::channel::>(buffer_size);// Adjust buffer size as needed let output_folder = output_path.parent().unwrap(); std::fs::create_dir_all(output_folder) diff --git a/crates/rendering/Cargo.toml b/crates/rendering/Cargo.toml index 1e4db5cb..894cda95 100644 --- a/crates/rendering/Cargo.toml +++ b/crates/rendering/Cargo.toml @@ -25,3 +25,4 @@ ffmpeg.workspace = true lru = "0.12.4" ffmpeg-sys-next = "7.0.2" futures = "0.3.30" +num_cpus = "1.13.1" diff --git a/crates/rendering/src/lib.rs b/crates/rendering/src/lib.rs index db55a8c7..a8d0d6f1 100644 --- a/crates/rendering/src/lib.rs +++ b/crates/rendering/src/lib.rs @@ -9,10 +9,18 @@ use std::collections::HashMap; use wgpu::util::DeviceExt; use wgpu::COPY_BYTES_PER_ROW_ALIGNMENT; +use tokio::sync::mpsc; +use tokio::time::{timeout, Duration}; +use futures::stream::{self, StreamExt}; +use std::sync::Arc; +use num_cpus; + use cap_project::{ AspectRatio, BackgroundSource, CameraXPosition, CameraYPosition, Crop, ProjectConfiguration, XY, }; + + use std::time::Instant; pub mod decoder; @@ -97,10 +105,10 @@ impl RecordingDecoders { pub async fn render_video_to_channel( options: RenderOptions, project: ProjectConfiguration, - sender: tokio::sync::mpsc::UnboundedSender>, + sender: mpsc::Sender>, decoders: RecordingDecoders, ) -> Result<(), String> { - let constants = RenderVideoConstants::new(options).await?; + let constants = Arc::new(RenderVideoConstants::new(options).await?); println!("Setting up FFmpeg input for screen recording..."); @@ -109,82 +117,98 @@ pub async fn render_video_to_channel( let start_time = Instant::now(); let duration = project.timeline().map(|t| t.duration()).unwrap_or(f64::MAX); - - let render_handle: tokio::task::JoinHandle> = tokio::spawn(async move { - let mut frame_number = 0; - - let uniforms = ProjectUniforms::new(&constants, &project); - let background = Background::from(project.background.source.clone()); - - loop { - if frame_number as f64 > 30_f64 * duration { - break; - }; - - let time = if let Some(timeline) = project.timeline() { - match timeline.get_recording_time(frame_number as f64 / 30_f64) { - Some(time) => time, - None => break, - } - } else { - frame_number as f64 / 30_f64 - }; - - let Some((screen_frame, camera_frame)) = - decoders.get_frames((time * 30.0) as u32).await - else { - break; - }; - - let frame = match produce_frame( - &constants, - &screen_frame, - &camera_frame, - background, - &uniforms, - ) - .await - { - Ok(frame) => frame, - Err(e) => { - eprintln!("{e}"); - break; - } - }; - - if sender.send(frame).is_err() { - eprintln!("Failed to send processed frame to channel"); - break; + let uniforms = Arc::new(ProjectUniforms::new(&constants, &project)); + let background = Background::from(project.background.source.clone()); + + let total_frames = (30_f64 * duration) as u32; + + let stream = stream::iter(0..total_frames) + .map(|frame_number| { + let constants = constants.clone(); + let uniforms = uniforms.clone(); + let decoders = decoders.clone(); + let project = project.clone(); + async move { + process_frame(frame_number, &constants, &uniforms, &decoders, &project, background).await } - - frame_number += 1; - if frame_number % 60 == 0 { - let elapsed = start_time.elapsed(); - println!( - "Rendered {} frames in {:?} seconds", - frame_number, - elapsed.as_secs_f32() - ); + }) + .buffer_unordered(num_cpus::get()); + + let frames_processed = stream + .fold(0, |acc, result| { + let sender = sender.clone(); // Clone the sender here + async move { + match result { + Ok(Some((frame_number, frame))) => { + if let Err(e) = sender.send(frame).await { + eprintln!("Failed to send processed frame to channel: {}", e); + } + let new_acc = acc + 1; + if new_acc % 60 == 0 { + let now = Instant::now(); + let elapsed = now.duration_since(start_time); + println!( + "Rendered {} frames in {:?} seconds", + new_acc, + elapsed.as_secs_f32() + ); + } + new_acc + } + Ok(None) => acc, + Err(e) => { + eprintln!("Error processing frame: {}", e); + acc + } + } } - } - - println!("Render loop exited"); - - Ok(frame_number) - }); - - let total_frames = render_handle.await.map_err(|e| e.to_string())??; + }) + .await; let total_time = start_time.elapsed(); println!( "Render complete. Processed {} frames in {:?} seconds", - total_frames, + frames_processed, total_time.as_secs_f32() ); Ok(()) } +async fn process_frame( + frame_number: u32, + constants: &Arc, + uniforms: &Arc, + decoders: &RecordingDecoders, + project: &ProjectConfiguration, + background: Background, +) -> Result)>, String> { + let time = if let Some(timeline) = project.timeline() { + match timeline.get_recording_time(frame_number as f64 / 30_f64) { + Some(time) => time, + None => return Ok(None), + } + } else { + frame_number as f64 / 30_f64 + }; + + let (screen_frame, camera_frame) = match decoders.get_frames((time * 30.0) as u32).await { + Some(frames) => frames, + None => return Ok(None), + }; + + match timeout( + Duration::from_secs(30), + produce_frame(constants, &screen_frame, &camera_frame, background, uniforms), + ) + .await + { + Ok(Ok(frame)) => Ok(Some((frame_number, frame))), + Ok(Err(e)) => Err(e), + Err(_) => Err("Frame production timed out".to_string()), + } +} + pub struct RenderVideoConstants { pub _instance: wgpu::Instance, pub _adapter: wgpu::Adapter,