From 37331672cf9db70b41f3ed2140f7476eb6364e96 Mon Sep 17 00:00:00 2001 From: Jaro Habiger Date: Mon, 11 Mar 2024 19:29:58 +0100 Subject: [PATCH] Revert accidentially pushed commits some commits (68782144db2741493be3c313dc0de8347b97b532..68782144db2741493be3c313dc0de8347b97b532) were accidentially pushed to the main branch. this reverts this misstake --- Cargo.lock | 49 +-- Cargo.toml | 6 +- README.md | 5 - src/bin/cli.rs | 42 +-- src/gui/image.rs | 11 +- src/nodes.rs | 28 +- src/nodes_cpu/average.rs | 22 +- src/nodes_cpu/benchmark_sink.rs | 9 +- src/nodes_cpu/bitdepth_convert.rs | 30 +- src/nodes_cpu/dual_frame_raw_decoder.rs | 88 +++--- src/nodes_cpu/mod.rs | 4 +- src/nodes_cpu/sz3.rs | 14 +- src/nodes_cpu/zstd.rs | 22 +- src/nodes_gpu/base_gpu_node.rs | 229 -------------- src/nodes_gpu/bitdepth_convert.rs | 131 ++++++++ src/nodes_gpu/bitdepth_convert_12_8.glsl | 46 +++ src/nodes_gpu/calibrate.rs | 21 +- src/nodes_gpu/color_voodoo.glsl | 59 ++-- src/nodes_gpu/color_voodoo.rs | 180 ++++++----- src/nodes_gpu/debayer.glsl | 53 +++- src/nodes_gpu/debayer.rs | 198 ++++++++----- src/nodes_gpu/display.rs | 28 +- src/nodes_gpu/histogram.rs | 24 +- src/nodes_gpu/lut_3d.glsl | 30 ++ src/nodes_gpu/lut_3d.rs | 202 +++++++------ src/nodes_gpu/mod.rs | 3 +- src/nodes_gpu/plot.rs | 30 +- src/nodes_gpu/shader_util.rs | 279 ------------------ src/nodes_io/frameserver_cinema_dng.rs | 73 +++-- src/nodes_io/reader_cinema_dng.rs | 63 +--- src/nodes_io/reader_raw.rs | 45 +-- src/nodes_io/reader_tcp.rs | 14 +- src/nodes_io/reader_webcam.rs | 22 +- src/nodes_io/writer_cinema_dng.rs | 98 +++--- src/nodes_io/writer_ffmpeg.rs | 31 +- src/nodes_io/writer_raw.rs | 20 +- src/nodes_util/cache.rs | 2 +- src/nodes_util/mod.rs | 1 - src/nodes_util/null_source.rs | 52 ---- src/pipeline_processing/frame.rs | 171 +++++++---- src/pipeline_processing/gpu_util.rs | 20 +- src/pipeline_processing/node.rs | 2 +- src/pipeline_processing/parametrizable.rs | 219 +++++--------- src/pipeline_processing/processing_context.rs | 53 +++- src/pipeline_processing/puller.rs | 20 +- 45 files changed, 1225 insertions(+), 1524 deletions(-) delete mode 100644 src/nodes_gpu/base_gpu_node.rs create mode 100644 src/nodes_gpu/bitdepth_convert.rs create mode 100644 src/nodes_gpu/bitdepth_convert_12_8.glsl create mode 100644 src/nodes_gpu/lut_3d.glsl delete mode 100644 src/nodes_gpu/shader_util.rs delete mode 100644 src/nodes_util/null_source.rs diff --git a/Cargo.lock b/Cargo.lock index 47d4844..faa42fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,9 +79,6 @@ name = "anyhow" version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" -dependencies = [ - "backtrace", -] [[package]] name = "anymap" @@ -1324,12 +1321,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "indoc" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fe2b9d82064e8a0226fddb3547f37f28eaa46d0fc210e275d835f08cf3b76a7" - [[package]] name = "instant" version = "0.1.12" @@ -2399,7 +2390,6 @@ dependencies = [ "handlebars", "hyper", "indicatif", - "indoc", "itertools", "narui", "num_cpus", @@ -2407,12 +2397,9 @@ dependencies = [ "parking_lot 0.12.1", "pollster", "portpicker", - "regex", "serde", "serde_yaml", - "shaderc", "shlex 1.1.0", - "spirv-reflect", "sz3", "thiserror", "tokio", @@ -2437,9 +2424,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" +checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a" dependencies = [ "aho-corasick", "memchr", @@ -2575,9 +2562,9 @@ dependencies = [ [[package]] name = "shaderc" -version = "0.8.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31cef52787a0db5108788ea20bed13d6bf4b96287c5c5201e55725f7070f3443" +checksum = "80e6fe602a861622769530a23bc40bfba31adbf186d0c8412e83f5519c5d6bee" dependencies = [ "libc", "shaderc-sys", @@ -2585,9 +2572,9 @@ dependencies = [ [[package]] name = "shaderc-sys" -version = "0.8.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e8f8439fffcffd6efcd74197204addf935dbab5752696bd990a6cd36d54cf64" +checksum = "3794498651f8173d0afbc0bb8aca45ced111098227e755dde4c0ef2888c8d0bf" dependencies = [ "cmake", "libc", @@ -2690,30 +2677,6 @@ dependencies = [ "lock_api", ] -[[package]] -name = "spirv-reflect" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cecc7af6a7d3ca6d15f4d6b5077df89c77ad1f4b314d0cabee221656d041dad7" -dependencies = [ - "bitflags", - "cc", - "num-traits", - "serde", - "serde_derive", - "spirv_headers", -] - -[[package]] -name = "spirv_headers" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f5b132530b1ac069df335577e3581765995cba5a13995cdbbdbc8fb057c532c" -dependencies = [ - "bitflags", - "num-traits", -] - [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 55dc4cb..12ba224 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ track-drop = ["backtrace"] clap = { version = "3.1.18", features = ["default", "derive"] } indicatif = "0.17.0" glob = "0.3.0" -anyhow = { version = "1.0.57", features = ["backtrace"] } +anyhow = "1.0.57" itertools = "0.10.3" bytemuck = "1.9.1" vulkano = "0.30.0" @@ -50,10 +50,6 @@ tokio = { version = "1.19.2", features = ["full"] } hyper = { version = "0.14.23", features = ["full"] } futures-util = "0.3.25" portpicker = "0.1.1" -shaderc = "0.8.2" -indoc = "2.0.0" -spirv-reflect = "0.2.3" -regex = "1.7.1" [target.'cfg(target_os = "linux")'.dependencies] v4l = "0.13.0" diff --git a/README.md b/README.md index ab64a5a..f5b16fd 100644 --- a/README.md +++ b/README.md @@ -91,11 +91,6 @@ The config file supports variable substitution. You can set name value pairs on ## Examples -Display a live stream from the AXIOM Beta (running in raw mode, adjust --device= number accoring to your HDMI capture device): -```shell -$ target/release/cli from-cli WebcamInput --device=0 ! DualFrameRawDecoder ! BitDepthConverter ! Debayer ! Display -``` - Convert a directory of raw12 files recorded previously from the AXIOM Beta to mp4 (h264) using FFmpeg: ```shell $ target/release/cli from-cli RawDirectoryReader --file-pattern '~/Darkbox-Timelapse-Clock-Sequence/*.raw12' --bit-depth 12 --height 3072 --width 4096 --loop true --fps 30 ! BitDepthConverter ! Debayer ! FfmpegWriter --output darkbox.mp4 diff --git a/src/bin/cli.rs b/src/bin/cli.rs index fd65d74..cb7c43e 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -183,7 +183,7 @@ fn processing_node_from_commandline( let node_descriptor: &ParameterizableDescriptor = available_nodes.get(name).ok_or_else(|| { anyhow!( - "cant find node with name {}. available nodes are: \n{}", + "cant find node with name {}. avalable nodes are: \n{}", name, nodes_usages_string() ) @@ -204,16 +204,13 @@ fn processing_node_from_commandline( Mandatory(NodeInputParameter) | WithDefault(NodeInputParameter, _) ) }) - .filter_map(|(key, parameter_type)| { - results.value_of(key).map(|v| { - Ok(( - key.to_string(), - parameter_type - .get_parameter_type() - .parse(v) - .context(format!("parameter is {}", key))?, - )) - }) + .map(|(key, parameter_type)| { + Ok(( + key.to_string(), + parameter_type + .parse(results.value_of(key)) + .context(format!("parameter is {}", key))?, + )) }) .collect::>()?; @@ -231,7 +228,7 @@ fn clap_app_from_node_name(name: &str) -> Result> { let node_descriptor: &ParameterizableDescriptor = available_nodes.get(name).ok_or_else(|| { anyhow!( - "cant find node with name {}. available nodes are: {:?}", + "cant find node with name {}. avalable nodes are: {:?}", name, available_nodes.keys() ) @@ -255,40 +252,23 @@ fn clap_app_from_node_name(name: &str) -> Result> { .allow_hyphen_values(true) .validator(move |v| { parameter_type_for_closure - .get_parameter_type() - .parse(v) + .parse(Some(v)) .map(|_| ()) .map_err(|e| format!("{}", e)) }) .required(true), - WithDefault(BoolParameter, BoolValue(false)) => { - Arg::new(key.as_str()).long(key).takes_value(false).required(false) - } WithDefault(_, default) => Arg::new(key.as_str()) .long(key) .takes_value(true) .allow_hyphen_values(true) .validator(move |v| { parameter_type_for_closure - .get_parameter_type() - .parse(v) + .parse(Some(v)) .map(|_| ()) .map_err(|e| format!("{}", e)) }) .default_value(Box::leak(Box::new(default.to_string()))) .required(false), - Optional(_) => Arg::new(key.as_str()) - .long(key) - .takes_value(true) - .allow_hyphen_values(true) - .validator(move |v| { - parameter_type_for_closure - .get_parameter_type() - .parse(v) - .map(|_| ()) - .map_err(|e| format!("{}", e)) - }) - .required(false), }) } Ok(app) diff --git a/src/gui/image.rs b/src/gui/image.rs index 03fada9..5f482b1 100644 --- a/src/gui/image.rs +++ b/src/gui/image.rs @@ -1,4 +1,7 @@ -use crate::pipeline_processing::{buffers::GpuBuffer, frame::Frame}; +use crate::pipeline_processing::{ + buffers::GpuBuffer, + frame::{Frame, Rgb}, +}; use derivative::Derivative; use narui::{layout::Maximal, *}; use std::sync::Arc; @@ -117,7 +120,7 @@ impl PartialEq for ArcPartialEqHelper { } #[widget] -pub fn image(image: Arc>, context: &mut WidgetContext) -> FragmentInner { +pub fn image(image: Arc>, context: &mut WidgetContext) -> FragmentInner { let cloned_image = ArcPartialEqHelper(image.clone()); let device = context.vulkan_context.device.clone(); @@ -156,8 +159,8 @@ pub fn image(image: Arc>, context: &mut WidgetContext) -> Fragm origin: origin.into(), size: size.into(), z_index, - width: image.interpretation.width as u32, - height: image.interpretation.height as u32, + width: image.interp.width as u32, + height: image.interp.height as u32, _dummy0: Default::default(), }; diff --git a/src/nodes.rs b/src/nodes.rs index 9c6df13..8210c3a 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -6,13 +6,15 @@ use crate::nodes_gpu::plot::Plot; use crate::nodes_io::reader_webcam::WebcamInput; use crate::{ nodes_cpu::{ - //average::Average, + average::Average, benchmark_sink::BenchmarkSink, + bitdepth_convert::BitDepthConverter, dual_frame_raw_decoder::{DualFrameRawDecoder, ReverseDualFrameRawDecoder}, - //sz3::SZ3Compress, + sz3::SZ3Compress, zstd::ZstdBlobReader, }, nodes_gpu::{ + bitdepth_convert::GpuBitDepthConverter, calibrate::Calibrate, color_voodoo::ColorVoodoo, debayer::Debayer, @@ -33,14 +35,10 @@ use crate::{ processing_context::ProcessingContext, }, }; -use crate::{ - nodes_gpu::base_gpu_node::GpuNodeImpl, - nodes_io::{frameserver_cinema_dng::CinemaDngFrameserver, writer_ffmpeg::FfmpegWriter}, - nodes_util::null_source::NullFrameSource, -}; + +use crate::nodes_io::{frameserver_cinema_dng::CinemaDngFrameserver, writer_ffmpeg::FfmpegWriter}; use anyhow::{anyhow, Result}; use std::collections::HashMap; - macro_rules! generate_dynamic_node_creation_functions { ($($(#[$m:meta])? $x:ty),+ $(,)?) => { pub fn list_available_nodes() -> HashMap { @@ -67,27 +65,28 @@ macro_rules! generate_dynamic_node_creation_functions { }; } - generate_dynamic_node_creation_functions![ RawDirectoryReader, RawBlobReader, CinemaDngWriter, CinemaDngReader, - GpuNodeImpl, - GpuNodeImpl, - GpuNodeImpl, + GpuBitDepthConverter, + Debayer, #[cfg(target_os = "linux")] Display, + BitDepthConverter, DualFrameRawDecoder, ReverseDualFrameRawDecoder, BenchmarkSink, + ColorVoodoo, RawDirectoryWriter, RawBlobWriter, - //Average, + Lut3d, + Average, TcpReader, Cache, Split, - //SZ3Compress, + SZ3Compress, ZstdBlobReader, Calibrate, Histogram, @@ -97,5 +96,4 @@ generate_dynamic_node_creation_functions![ WebcamInput, FfmpegWriter, CinemaDngFrameserver, - NullFrameSource, ]; diff --git a/src/nodes_cpu/average.rs b/src/nodes_cpu/average.rs index 2c26924..5d5dc8b 100644 --- a/src/nodes_cpu/average.rs +++ b/src/nodes_cpu/average.rs @@ -1,7 +1,7 @@ use crate::{ pipeline_processing::{ buffers::ChunkedCpuBuffer, - frame::{Frame}, + frame::{Frame, Raw}, node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, @@ -37,7 +37,7 @@ impl Parameterizable for Average { Ok(Self { input: parameters.take("input")?, num_frames: parameters.take::("n")? as usize, - produce_std: parameters.has("std"), + produce_std: parameters.take("std")?, last_frame_info: Default::default(), context: context.clone(), }) @@ -64,18 +64,18 @@ impl ProcessingNode for Average { let context = &self.context; let frame = - context.ensure_cpu_buffer_frame(&input).context("Wrong input format for Average")?; - let interpretation = frame.interpretation.clone(); - assert_eq!(frame.interpretation.bit_depth, 12); + context.ensure_cpu_buffer::(&input).context("Wrong input format for Average")?; + let interp = frame.interp; + assert_eq!(frame.interp.bit_depth, 12); // println!("[{frame_number}] adding {n}"); // f32 -> 4 bytes per pixel let out_buffer_avg = - unsafe { context.get_uninit_cpu_buffer((interpretation.height * interpretation.width * 4) as usize) }; + unsafe { context.get_uninit_cpu_buffer((interp.height * interp.width * 4) as usize) }; let out_buffer_std = - unsafe { context.get_uninit_cpu_buffer((interpretation.height * interpretation.width * 4) as usize) }; + unsafe { context.get_uninit_cpu_buffer((interp.height * interp.width * 4) as usize) }; let out = Arc::new(ChunkedCpuBuffer::::new( [out_buffer_avg, out_buffer_std], @@ -127,7 +127,7 @@ impl ProcessingNode for Average { _ => {} } let input = input?; - let frame = context_copy.ensure_cpu_buffer(&input).context("Wrong input format for Average")?; + let frame = context_copy.ensure_cpu_buffer::(&input).context("Wrong input format for Average")?; assert_eq!(interp.bit_depth, 12); frame.storage.as_slice_async(|frame: &[u8]| async move { @@ -198,10 +198,10 @@ impl ProcessingNode for Average { } }); - let mut interp = interpretation; + let mut interp = interp; interp.bit_depth = 32; - let avg_frame = Frame { storage: avg_buffer, interpretation: interp }; - let std_frame = Frame { storage: std_buffer, interpretation: interp }; + let avg_frame = Frame { storage: avg_buffer, interp }; + let std_frame = Frame { storage: std_buffer, interp }; if self.produce_std { Ok(Payload::from(vec![Payload::from(avg_frame), Payload::from(std_frame)])) diff --git a/src/nodes_cpu/benchmark_sink.rs b/src/nodes_cpu/benchmark_sink.rs index 7e4bc11..8ef140f 100644 --- a/src/nodes_cpu/benchmark_sink.rs +++ b/src/nodes_cpu/benchmark_sink.rs @@ -6,7 +6,6 @@ use crate::pipeline_processing::{ }; use anyhow::Result; use async_trait::async_trait; - use std::{sync::Arc, time::Instant}; @@ -21,7 +20,7 @@ impl Parameterizable for BenchmarkSink { fn describe_parameters() -> ParametersDescriptor { ParametersDescriptor::new() .with("input", Mandatory(NodeInputParameter)) - .with("priority", WithDefault(U8(), ParameterValue::IntRangeValue(0))) + .with("priority", Optional(U8())) } fn from_parameters( @@ -65,7 +64,7 @@ impl SinkNode for BenchmarkSink { self.priority, progress_callback.clone(), self.input.clone_for_same_puller(), - None, + 0, move |_input, _frame_number| Ok(()), ) .await; @@ -80,7 +79,7 @@ impl SinkNode for BenchmarkSink { self.priority, progress_callback.clone(), self.input.clone_for_same_puller(), - None, + 0, move |_input, _frame_number| Ok(()), ) .await?; @@ -102,7 +101,7 @@ impl SinkNode for BenchmarkSink { self.priority, progress_callback, self.input.clone_for_same_puller(), - None, + 0, ); let reporter = FPSReporter::new("pipeline"); loop { diff --git a/src/nodes_cpu/bitdepth_convert.rs b/src/nodes_cpu/bitdepth_convert.rs index 0ec0d8d..d7acae1 100644 --- a/src/nodes_cpu/bitdepth_convert.rs +++ b/src/nodes_cpu/bitdepth_convert.rs @@ -7,7 +7,7 @@ use anyhow::{Context, Result}; use crate::pipeline_processing::{ - frame::{Frame, FrameInterpretation, SampleInterpretation}, + frame::{Frame, FrameInterpretation, Raw}, node::{Caps, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, processing_context::ProcessingContext, @@ -38,18 +38,14 @@ impl ProcessingNode for BitDepthConverter { let input = self.input.pull(request).await?; let frame = self .context - .ensure_cpu_buffer_frame(&input) + .ensure_cpu_buffer::(&input) .context("Wrong input format for BitDepthConverter")?; - let interpretation = FrameInterpretation { - sample_interpretation: SampleInterpretation::UInt(8), - ..frame.interpretation.clone() - }; - let mut new_buffer = - unsafe { self.context.get_uninit_cpu_buffer(interpretation.required_bytes()) }; + let interp = Raw { bit_depth: 8, ..frame.interp }; + let mut new_buffer = unsafe { self.context.get_uninit_cpu_buffer(interp.required_bytes()) }; - if let SampleInterpretation::UInt(8) = frame.interpretation.sample_interpretation { + if frame.interp.bit_depth == 8 { return Ok(input); - } else if let SampleInterpretation::UInt(12) = frame.interpretation.sample_interpretation { + } else if frame.interp.bit_depth == 12 { new_buffer.as_mut_slice(|new_buffer| { frame.storage.as_slice(|frame_storage| { for (input, output) in @@ -60,21 +56,21 @@ impl ProcessingNode for BitDepthConverter { } }) }); - } else if let SampleInterpretation::UInt(bits) = frame.interpretation.sample_interpretation - { + } else { let mut rest_value: u32 = 0; let mut rest_bits: u32 = 0; let mut pos = 0; new_buffer.as_mut_slice(|new_buffer| { frame.storage.as_slice(|frame_storage| { for value in frame_storage.iter() { - let bits_more_than_bit_depth = (rest_bits as i32 + 8) - bits as i32; + let bits_more_than_bit_depth = + (rest_bits as i32 + 8) - frame.interp.bit_depth as i32; if bits_more_than_bit_depth >= 0 { let new_n_bit_value: u32 = rest_value - .wrapping_shl(bits as u32 - rest_bits) + .wrapping_shl(frame.interp.bit_depth as u32 - rest_bits) | value.wrapping_shr(8 - bits_more_than_bit_depth as u32) as u32; - new_buffer[pos] = (if bits > 8 { - new_n_bit_value.wrapping_shr(bits as u32 - 8) + new_buffer[pos] = (if frame.interp.bit_depth > 8 { + new_n_bit_value.wrapping_shr(frame.interp.bit_depth as u32 - 8) } else { new_n_bit_value } as u8); @@ -91,7 +87,7 @@ impl ProcessingNode for BitDepthConverter { }); } - let new_frame = Frame { storage: new_buffer, interpretation }; + let new_frame = Frame { storage: new_buffer, interp }; Ok(Payload::from(new_frame)) } diff --git a/src/nodes_cpu/dual_frame_raw_decoder.rs b/src/nodes_cpu/dual_frame_raw_decoder.rs index 4309aea..311f1a0 100644 --- a/src/nodes_cpu/dual_frame_raw_decoder.rs +++ b/src/nodes_cpu/dual_frame_raw_decoder.rs @@ -1,14 +1,7 @@ use crate::{ pipeline_processing::{ buffers::CpuBuffer, - frame::{ - CfaDescriptor, - ColorInterpretation, - Compression, - Frame, - FrameInterpretation, - SampleInterpretation, - }, + frame::{CfaDescriptor, Frame, FrameInterpretation, Raw, Rgb}, node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, parametrizable::{prelude::*, Parameterizable, Parameters, ParametersDescriptor}, payload::Payload, @@ -16,7 +9,7 @@ use crate::{ }, util::async_notifier::AsyncNotifier, }; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use futures::join; use std::sync::Arc; @@ -24,7 +17,7 @@ use std::sync::Arc; const FRAME_A_MARKER: u8 = 0xAA; #[derive(Clone, Default)] -struct LastFrameInfo(u64, u64, u8, Option>>); +struct LastFrameInfo(u64, u64, u8, Option>>); pub struct DualFrameRawDecoder { input: InputProcessingNode, @@ -38,7 +31,8 @@ impl Parameterizable for DualFrameRawDecoder { ParametersDescriptor::new() .with("input", Mandatory(NodeInputParameter)) .with("debug", Optional(BoolParameter)) - .with("bayer", WithDefault(StringParameter, StringValue("RGBG".to_string()))) + .with("red-in-first-col", WithDefault(BoolParameter, BoolValue(true))) + .with("red-in-first-row", WithDefault(BoolParameter, BoolValue(false))) } fn from_parameters( @@ -46,18 +40,14 @@ impl Parameterizable for DualFrameRawDecoder { _is_input_to: &[NodeID], context: &ProcessingContext, ) -> Result { - let cfa_descriptor = match parameters.take::("bayer")?.to_uppercase().as_str() { - "RGBG" => CfaDescriptor { red_in_first_col: true, red_in_first_row: true }, - "BGRG" => CfaDescriptor { red_in_first_col: true, red_in_first_row: false }, - "GBGR" => CfaDescriptor { red_in_first_col: false, red_in_first_row: true }, - "GRGB" => CfaDescriptor { red_in_first_col: false, red_in_first_row: true }, - _ => bail!("couldn't parse CFA Pattern"), - }; Ok(Self { input: parameters.take("input")?, - cfa_descriptor, + cfa_descriptor: CfaDescriptor { + red_in_first_col: parameters.take("red-in-first-col")?, + red_in_first_row: parameters.take("red-in-first-row")?, + }, last_frame_info: Default::default(), - debug: parameters.has("debug"), + debug: parameters.take("debug")?, context: context.clone(), }) } @@ -81,7 +71,7 @@ impl ProcessingNode for DualFrameRawDecoder { let frame = self.input.pull(request.with_frame_number(next_even)).await?; let frame_b = self .context - .ensure_cpu_buffer_frame(&frame) + .ensure_cpu_buffer::(&frame) .context("Wrong input format for DualFrameRawDecoder")?; Result::<_>::Ok(((frame_a, frame_b), true)) } @@ -92,11 +82,11 @@ impl ProcessingNode for DualFrameRawDecoder { ); let frame_a = self .context - .ensure_cpu_buffer_frame(&frames.0?) + .ensure_cpu_buffer::(&frames.0?) .context("Wrong input format for DualFrameRawDecoder")?; let frame_b = self .context - .ensure_cpu_buffer_frame(&frames.1?) + .ensure_cpu_buffer::(&frames.1?) .context("Wrong input format for DualFrameRawDecoder")?; Result::<_>::Ok(((frame_a, frame_b), false)) } @@ -158,19 +148,17 @@ impl ProcessingNode for DualFrameRawDecoder { return Err(anyhow!("frame slipped in DualFrameRawDecoder:\n{}", debug_info)); } - let interpretation = FrameInterpretation { - width: frame_a.interpretation.width * 2, - height: frame_a.interpretation.height * 2, - fps: frame_a.interpretation.fps.map(|v| v / 2.0), - color_interpretation: ColorInterpretation::Bayer(self.cfa_descriptor), - sample_interpretation: SampleInterpretation::UInt(8), - compression: Compression::Uncompressed, + let interp = Raw { + width: frame_a.interp.width * 2, + height: frame_a.interp.height * 2, + bit_depth: 12, + cfa: self.cfa_descriptor, + fps: frame_a.interp.fps / 2.0, }; - let mut new_buffer = - unsafe { self.context.get_uninit_cpu_buffer(interpretation.required_bytes()) }; + let mut new_buffer = unsafe { self.context.get_uninit_cpu_buffer(interp.required_bytes()) }; - let line_bytes = frame_a.interpretation.width as usize * 3; + let line_bytes = frame_a.interp.width as usize * 3; frame_a.storage.as_slice(|frame_a| { frame_b.storage.as_slice(|frame_b| { let (frame_a, frame_b) = if frame_a[2] == FRAME_A_MARKER { @@ -193,7 +181,7 @@ impl ProcessingNode for DualFrameRawDecoder { }) }); - Ok(Payload::from(Frame { interpretation, storage: new_buffer })) + Ok(Payload::from(Frame { interp, storage: new_buffer })) } fn get_caps(&self) -> Caps { @@ -217,7 +205,9 @@ impl Parameterizable for ReverseDualFrameRawDecoder { // Should also be transparently fixed by DualFrameRawDecoder .with( "flip", - Optional(BoolParameter), + Optional( + BoolParameter + ), ) } @@ -239,23 +229,14 @@ impl ProcessingNode for ReverseDualFrameRawDecoder { async fn pull(&self, request: Request) -> Result { let downstream = request.frame_number() / 2; let frame = self.input.pull(request.with_frame_number(downstream)).await?; - let frame = self.context.ensure_cpu_buffer_frame(&frame)?; - - if !matches!(frame.interpretation.sample_interpretation, SampleInterpretation::UInt(12)) { - bail!("A frame with bit_depth=12 is required. Convert the bit depth of the frame!") - } - if !matches!(frame.interpretation.color_interpretation, ColorInterpretation::Bayer(_cfa)) { - bail!("A frame with bayer pattern is expected!") - } - + let frame = self.context.ensure_cpu_buffer::(&frame)?; let offset = if self.flip { 1 } else { 0 }; let offset = ((request.frame_number() + offset) % 2) as usize; - let line_bytes = (frame.interpretation.width * 3 / 2) as usize; + let line_bytes = (frame.interp.width * 3 / 2) as usize; let out_buffer = unsafe { - let mut buffer = self - .context - .get_uninit_cpu_buffer(line_bytes * frame.interpretation.height as usize / 2); + let mut buffer = + self.context.get_uninit_cpu_buffer(line_bytes * frame.interp.height as usize / 2); buffer.as_mut_slice(|buffer| { frame.storage.as_slice(|input| { for (out, input) in buffer @@ -271,13 +252,10 @@ impl ProcessingNode for ReverseDualFrameRawDecoder { }; Ok(Payload::from(Frame { - interpretation: FrameInterpretation { - width: frame.interpretation.width / 2, - height: frame.interpretation.height / 2, - fps: frame.interpretation.fps.map(|v| v * 2.0), - color_interpretation: ColorInterpretation::Rgb, - sample_interpretation: SampleInterpretation::UInt(8), - compression: Compression::Uncompressed, + interp: Rgb { + width: frame.interp.width / 2, + height: frame.interp.height / 2, + fps: frame.interp.fps * 2.0, }, storage: out_buffer, })) diff --git a/src/nodes_cpu/mod.rs b/src/nodes_cpu/mod.rs index e2bbd59..acc06f2 100644 --- a/src/nodes_cpu/mod.rs +++ b/src/nodes_cpu/mod.rs @@ -1,6 +1,6 @@ -//pub mod average; +pub mod average; pub mod benchmark_sink; pub mod bitdepth_convert; pub mod dual_frame_raw_decoder; -//pub mod sz3; +pub mod sz3; pub mod zstd; diff --git a/src/nodes_cpu/sz3.rs b/src/nodes_cpu/sz3.rs index 33387e8..5d828c8 100644 --- a/src/nodes_cpu/sz3.rs +++ b/src/nodes_cpu/sz3.rs @@ -1,5 +1,5 @@ use crate::pipeline_processing::{ - frame::{Frame, Bayer, Rgb, SZ3Compressed}, + frame::{Frame, Raw, Rgb, SZ3Compressed}, node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, @@ -83,11 +83,11 @@ impl ProcessingNode for SZ3Compress { async fn pull(&self, request: Request) -> Result { let input = self.input.pull(request).await?; let (bytes, frame_dims, interp) = - if let Ok(frame) = self.context.ensure_cpu_buffer::(&input) { + if let Ok(frame) = self.context.ensure_cpu_buffer::(&input) { ( frame.storage.clone(), - vec![frame.interpretation.width as _, frame.interpretation.height as _], - Arc::new(frame.interpretation) as Arc<_>, + vec![frame.interp.width as _, frame.interp.height as _], + Arc::new(frame.interp) as Arc<_>, ) } else { let frame = self @@ -96,8 +96,8 @@ impl ProcessingNode for SZ3Compress { .context("Wrong input format for SZ3Compress")?; ( frame.storage.clone(), - vec![3, frame.interpretation.width as _, frame.interpretation.height as _], - Arc::new(frame.interpretation) as Arc<_>, + vec![3, frame.interp.width as _, frame.interp.height as _], + Arc::new(frame.interp) as Arc<_>, ) }; @@ -142,7 +142,7 @@ impl ProcessingNode for SZ3Compress { buffer }; - let new_frame = Frame { interpretation: SZ3Compressed::new(interp, buffer.len()), storage: buffer }; + let new_frame = Frame { interp: SZ3Compressed::new(interp, buffer.len()), storage: buffer }; Ok(Payload::from(new_frame)) } diff --git a/src/nodes_cpu/zstd.rs b/src/nodes_cpu/zstd.rs index d2c61f1..e2852bb 100644 --- a/src/nodes_cpu/zstd.rs +++ b/src/nodes_cpu/zstd.rs @@ -1,6 +1,6 @@ use crate::{ pipeline_processing::{ - frame::{Frame, FrameInterpretation}, + frame::{Frame, FrameInterpretation, FrameInterpretations}, node::{Caps, EOFError, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, @@ -20,7 +20,7 @@ pub struct ZstdBlobReader { u64, Arc>>>, )>, - interpretation: FrameInterpretation, + interp: FrameInterpretations, context: ProcessingContext, } impl Parameterizable for ZstdBlobReader { @@ -38,13 +38,13 @@ impl Parameterizable for ZstdBlobReader { let path: String = options.take("file")?; let file = std::fs::File::open(path)?; - let interpretation = options.get_interpretation()?; + let interp = options.get_interpretation()?; Ok(Self { frame_and_file: AsyncNotifier::new(( 0, Arc::new(Mutex::new(zstd::stream::read::Decoder::new(file)?)), )), - interpretation, + interp, context: context.clone(), }) } @@ -64,14 +64,22 @@ impl ProcessingNode for ZstdBlobReader { let mut decoder = decoder.lock().unwrap(); let mut buffer = - unsafe { self.context.get_uninit_cpu_buffer(self.interpretation.required_bytes()) }; + unsafe { self.context.get_uninit_cpu_buffer(self.interp.required_bytes()) }; + // dbg!(self.interp.required_bytes()); buffer.as_mut_slice(move |buffer| { decoder.read_exact(buffer).context(EOFError)?; - Result::<_, anyhow::Error>::Ok(()) + // dbg!(buffer[0], buffer[1], buffer[2]); + anyhow::Result::<_, anyhow::Error>::Ok(()) })?; self.frame_and_file.update(|(frame_no, _)| *frame_no = frame_number + 1); - Ok(Payload::from(Frame { interpretation: self.interpretation.clone(), storage: buffer })) + let payload = match self.interp { + FrameInterpretations::Raw(interp) => Payload::from(Frame { storage: buffer, interp }), + FrameInterpretations::Rgb(interp) => Payload::from(Frame { storage: buffer, interp }), + FrameInterpretations::Rgba(interp) => Payload::from(Frame { storage: buffer, interp }), + }; + + Ok(payload) } fn get_caps(&self) -> Caps { Caps { frame_count: None, random_access: false } } diff --git a/src/nodes_gpu/base_gpu_node.rs b/src/nodes_gpu/base_gpu_node.rs deleted file mode 100644 index e49cf8f..0000000 --- a/src/nodes_gpu/base_gpu_node.rs +++ /dev/null @@ -1,229 +0,0 @@ -use crate::{ - nodes_gpu::shader_util::{compile_shader, generate_single_node_shader}, - pipeline_processing::{ - buffers::GpuBuffer, - frame::{Frame, FrameInterpretation}, - gpu_util::ensure_gpu_buffer_frame, - node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, - parametrizable::{ - prelude::*, - Parameterizable, - ParameterizableDescriptor, - Parameters, - ParametersDescriptor, - }, - payload::Payload, - processing_context::ProcessingContext, - }, -}; -use anyhow::{anyhow, bail, Context, Result}; -use async_trait::async_trait; -use parking_lot::RwLock; -use std::{collections::HashMap, sync::Arc}; -use vulkano::{ - buffer::{BufferAccess, BufferUsage, DeviceLocalBuffer}, - command_buffer::{AutoCommandBufferBuilder, CommandBufferUsage::OneTimeSubmit}, - descriptor_set::{PersistentDescriptorSet, WriteDescriptorSet}, - device::{Device, Queue}, - image::ImageViewAbstract, - pipeline::{ComputePipeline, Pipeline, PipelineBindPoint}, - sampler::Sampler, - sync::GpuFuture, - DeviceSize, -}; - -#[derive(Clone)] -pub enum BindingValue { - U32(u32), - F32(f32), - Sampler((Arc, Arc)), - Buffer(Arc), -} - -pub trait GpuNode: Parameterizable { - fn get_glsl(&self) -> String; - fn get_binding( - &self, - frame_interpretation: &FrameInterpretation, - ) -> Result>; - fn get_interpretation(&self, frame_interpretation: FrameInterpretation) -> FrameInterpretation { - frame_interpretation - } -} - -#[derive(Clone)] -struct PipelineCacheItem { - tag: FrameInterpretation, - pipeline: Arc, - push_constant_names: Vec<(String, u32)>, - binding_names: Vec<(String, u32)>, -} - -pub struct GpuNodeImpl { - gpu_node: T, - device: Arc, - pipeline: Arc>>, - queue: Arc, - input: InputProcessingNode, -} - -impl Parameterizable for GpuNodeImpl { - fn describe_parameters() -> ParametersDescriptor { - T::describe_parameters().with("input", Mandatory(NodeInputParameter)) - } - fn from_parameters( - mut parameters: Parameters, - is_input_to: &[NodeID], - context: &ProcessingContext, - ) -> Result - where - Self: Sized, - { - let input = parameters.take("input")?; - let gpu_node = T::from_parameters(parameters, is_input_to, context)?; - - let (device, queues) = context.require_vulkan()?; - let queue = queues.iter().find(|&q| q.family().supports_compute()).unwrap().clone(); - - let pipeline = Default::default(); - - Ok(Self { gpu_node, device, pipeline, queue, input }) - } - - fn get_name() -> String { T::get_name() } - fn describe() -> ParameterizableDescriptor { T::describe() } -} - -#[async_trait] -impl ProcessingNode for GpuNodeImpl { - async fn pull(&self, request: Request) -> Result { - let input = self.input.pull(request).await?; - let (frame, fut) = ensure_gpu_buffer_frame(&input, self.queue.clone()) - .context(format!("Wrong input format for node {}", Self::get_name()))?; - - let mut binding = self.gpu_node.get_binding(&frame.interpretation)?; - - let output_interpretation = self.gpu_node.get_interpretation(frame.interpretation); - - - // TODO: this is racy - if self.pipeline.read().is_none() - || self.pipeline.read().as_ref().unwrap().tag != frame.interpretation - { - let shader_code = generate_single_node_shader( - self.gpu_node.get_glsl(), - frame.interpretation, - output_interpretation, - )?; - let spirv = compile_shader(&shader_code) - .context(format!("compilation error. shader code is:\n{shader_code}"))?; - - let shader = unsafe { - vulkano::shader::ShaderModule::from_words(self.device.clone(), &spirv.as_binary()) - }?; - let pipeline = ComputePipeline::new( - self.device.clone(), - shader.entry_point("main").unwrap(), - &(), - None, - |_| {}, - )?; - - let reflection = spirv_reflect::ShaderModule::load_u32_data(spirv.as_binary()) - .map_err(|e| anyhow!(e))?; - - let mut push_constant_names = Vec::new(); - for block in - reflection.enumerate_push_constant_blocks(Some("main")).map_err(|e| anyhow!(e))? - { - for member in block.members { - push_constant_names.push((member.name.clone(), member.absolute_offset)) - } - } - - let mut binding_names = Vec::new(); - for binding in - reflection.enumerate_descriptor_bindings(Some("main")).map_err(|e| anyhow!(e))? - { - binding_names.push((binding.name, binding.binding)); - } - - self.pipeline.write().replace(PipelineCacheItem { - tag: frame.interpretation.clone(), - pipeline, - push_constant_names, - binding_names, - }); - } - - let PipelineCacheItem { pipeline, push_constant_names, binding_names, .. } = - self.pipeline.read().clone().unwrap(); - - let sink_buffer = DeviceLocalBuffer::<[u8]>::array( - self.device.clone(), - output_interpretation.required_bytes() as DeviceSize, - BufferUsage { storage_buffer: true, transfer_src: true, ..BufferUsage::none() }, - std::iter::once(self.queue.family()), - )?; - binding.insert("source".to_string(), BindingValue::Buffer(frame.storage.untyped())); - binding.insert("sink".to_string(), BindingValue::Buffer(sink_buffer.clone())); - - let layout = pipeline.layout().set_layouts()[0].clone(); - let set = PersistentDescriptorSet::new( - layout, - binding_names.iter().map(|(name, n)| match binding.get(name) { - Some(BindingValue::Buffer(buffer)) => { - WriteDescriptorSet::buffer(*n, buffer.clone()) - } - Some(BindingValue::Sampler((image_view, sampler))) => { - WriteDescriptorSet::image_view_sampler(*n, image_view.clone(), sampler.clone()) - } - Some(_) => panic!("invalid type for binding"), - None => panic!("not all binding values were supplied"), - }), - ) - .unwrap(); - - let mut builder = AutoCommandBufferBuilder::primary( - self.device.clone(), - self.queue.family(), - OneTimeSubmit, - ) - .unwrap(); - builder.bind_descriptor_sets(PipelineBindPoint::Compute, pipeline.layout().clone(), 0, set); - - for (name, absolute_offset) in push_constant_names { - let value = binding - .get(&name) - .ok_or_else(|| anyhow!("couldn't find binding value for push constant '{name}'"))?; - match value.clone() { - BindingValue::U32(v) => { - builder.push_constants(pipeline.layout().clone(), absolute_offset, v); - } - BindingValue::F32(v) => { - builder.push_constants(pipeline.layout().clone(), absolute_offset, v); - } - BindingValue::Sampler(_) => bail!("Samplers are not Push Constants"), - BindingValue::Buffer(_) => bail!("Buffers are not Push Constants"), - } - } - - builder.bind_pipeline_compute(pipeline.clone()).dispatch([ - (output_interpretation.width as u32 + 255) / 256, - (output_interpretation.height as u32 + 3) / 4, - 1, - ])?; - let command_buffer = builder.build()?; - - let future = - fut.then_execute(self.queue.clone(), command_buffer)?.then_signal_fence_and_flush()?; - - future.wait(None).unwrap(); - Ok(Payload::from(Frame { - interpretation: output_interpretation, - storage: GpuBuffer::from(sink_buffer), - })) - } - - fn get_caps(&self) -> Caps { self.input.get_caps() } -} diff --git a/src/nodes_gpu/bitdepth_convert.rs b/src/nodes_gpu/bitdepth_convert.rs new file mode 100644 index 0000000..8b63297 --- /dev/null +++ b/src/nodes_gpu/bitdepth_convert.rs @@ -0,0 +1,131 @@ +use crate::pipeline_processing::{ + buffers::GpuBuffer, + frame::{Frame, FrameInterpretation, Raw}, + gpu_util::ensure_gpu_buffer, + node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, + parametrizable::prelude::*, + payload::Payload, + processing_context::ProcessingContext, +}; +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use std::sync::Arc; +use vulkano::{ + buffer::{BufferUsage, DeviceLocalBuffer}, + command_buffer::{AutoCommandBufferBuilder, CommandBufferUsage::OneTimeSubmit}, + descriptor_set::{persistent::PersistentDescriptorSet, WriteDescriptorSet}, + device::{Device, Queue}, + pipeline::{ComputePipeline, Pipeline, PipelineBindPoint}, + sync::GpuFuture, + DeviceSize, +}; + +// generated by the macro +#[allow(clippy::needless_question_mark)] +mod compute_shader { + vulkano_shaders::shader! { + ty: "compute", + path: "src/nodes_gpu/bitdepth_convert_12_8.glsl" + } +} + +pub struct GpuBitDepthConverter { + device: Arc, + pipeline: Arc, + queue: Arc, + input: InputProcessingNode, +} + +impl Parameterizable for GpuBitDepthConverter { + fn describe_parameters() -> ParametersDescriptor { + ParametersDescriptor::new().with("input", Mandatory(NodeInputParameter)) + } + fn from_parameters( + mut parameters: Parameters, + _is_input_to: &[NodeID], + context: &ProcessingContext, + ) -> Result + where + Self: Sized, + { + let (device, queues) = context.require_vulkan()?; + let queue = queues.iter().find(|&q| q.family().supports_compute()).unwrap().clone(); + + let shader = compute_shader::load(device.clone()).unwrap(); + let pipeline = ComputePipeline::new( + device.clone(), + shader.entry_point("main").unwrap(), + &(), + None, + |_| {}, + ) + .unwrap(); + + Ok(GpuBitDepthConverter { device, pipeline, queue, input: parameters.take("input")? }) + } +} + +#[async_trait] +impl ProcessingNode for GpuBitDepthConverter { + async fn pull(&self, request: Request) -> Result { + let input = self.input.pull(request).await?; + + let (frame, fut) = ensure_gpu_buffer::(&input, self.queue.clone()) + .context("Wrong input format for GpuBitDepthConvert")?; + + if frame.interp.bit_depth != 12 { + return Err(anyhow!( + "A frame with bit_depth=12 is required. Convert the bit depth of the frame!" + )); + } + + let interp = Raw { bit_depth: 8, ..frame.interp }; + let sink_buffer = DeviceLocalBuffer::<[u8]>::array( + self.device.clone(), + interp.required_bytes() as DeviceSize, + BufferUsage { storage_buffer: true, transfer_src: true, ..BufferUsage::none() }, + std::iter::once(self.queue.family()), + )?; + + let push_constants = compute_shader::ty::PushConstantData { + width: interp.width as u32, + height: interp.height as u32, + }; + + let layout = self.pipeline.layout().set_layouts()[0].clone(); + let set = PersistentDescriptorSet::new( + layout, + [ + WriteDescriptorSet::buffer(0, frame.storage.untyped()), + WriteDescriptorSet::buffer(1, sink_buffer.clone()), + ], + ) + .unwrap(); + + let mut builder = AutoCommandBufferBuilder::primary( + self.device.clone(), + self.queue.family(), + OneTimeSubmit, + ) + .unwrap(); + builder + .bind_descriptor_sets( + PipelineBindPoint::Compute, + self.pipeline.layout().clone(), + 0, + set, + ) + .push_constants(self.pipeline.layout().clone(), 0, push_constants) + .bind_pipeline_compute(self.pipeline.clone()) + .dispatch([(interp.width as u32 + 31) / 16 / 2, (interp.height as u32 + 31) / 32, 1])?; + let command_buffer = builder.build()?; + + let future = + fut.then_execute(self.queue.clone(), command_buffer)?.then_signal_fence_and_flush()?; + + future.wait(None).unwrap(); + Ok(Payload::from(Frame { interp, storage: GpuBuffer::from(sink_buffer) })) + } + + fn get_caps(&self) -> Caps { self.input.get_caps() } +} diff --git a/src/nodes_gpu/bitdepth_convert_12_8.glsl b/src/nodes_gpu/bitdepth_convert_12_8.glsl new file mode 100644 index 0000000..2bf05fd --- /dev/null +++ b/src/nodes_gpu/bitdepth_convert_12_8.glsl @@ -0,0 +1,46 @@ +#version 450 +#extension GL_EXT_shader_explicit_arithmetic_types: enable +#extension GL_EXT_shader_explicit_arithmetic_types_int8: require + +layout(local_size_x = 16, local_size_y = 32, local_size_z = 1) in; + +layout(push_constant) uniform PushConstantData { + uint width; + uint height; +} params; + +layout(set = 0, binding = 0) buffer readonly Source { uint8_t data[]; } source; +layout(set = 0, binding = 1) buffer writeonly Sink { uint8_t data[]; } sink; + +void main() { + uvec2 pos = gl_GlobalInvocationID.xy; + if (pos.x * 2 >= params.width || pos.y >= params.height) return; + + + uint source_idx = pos.y * params.width * 3 / 2 + 3 * pos.x; + uint8_t a = source.data[source_idx + 0]; + uint8_t b = source.data[source_idx + 1]; + uint8_t c = source.data[source_idx + 2]; + + uint sink_idx = pos.y * params.width + 2 * pos.x; + sink.data[sink_idx + 0] = a; + sink.data[sink_idx + 1] = (b << 4) | (c >> 4); +} + +/* +layout(set = 0, binding = 0) buffer readonly Source { uint32_t data[]; } source; +layout(set = 0, binding = 1) buffer writeonly Sink { uint32_t data[]; } sink; + +void main() { + uvec2 pos = gl_GlobalInvocationID.xy; + + uint source_idx = pos.y * params.width * 3 / 2 + 3 * pos.x; + uint32_t a = source.data[source_idx + 0]; + uint32_t b = source.data[source_idx + 1]; + uint32_t c = source.data[source_idx + 2]; + + uint sink_idx = pos.y * params.width + 2 * pos.x; + sink.data[sink_idx + 0] = (a & 0xff000000) | ((a << 4) & 0x00ff0000) | ((a << 8) & 0x0000ff00) | ((b >> 20)); + sink.data[sink_idx + 1] = ((b << 16) & 0xff000000) | ((b << 20) & 0x00f00000) | ((c >> 12) & 0x000f0000) | ((c >> 8) & 0x0000ff00) | ((c >> 4) & 0x000000ff); +} +*/ diff --git a/src/nodes_gpu/calibrate.rs b/src/nodes_gpu/calibrate.rs index 1a603d5..c4cf6bf 100644 --- a/src/nodes_gpu/calibrate.rs +++ b/src/nodes_gpu/calibrate.rs @@ -1,7 +1,7 @@ use crate::pipeline_processing::{ buffers::GpuBuffer, - frame::Frame, - gpu_util::ensure_gpu_buffer_frame, + frame::{Frame, FrameInterpretation, Raw}, + gpu_util::ensure_gpu_buffer, node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, @@ -115,12 +115,12 @@ impl ProcessingNode for Calibrate { async fn pull(&self, request: Request) -> Result { let input = self.input.pull(request).await?; - let (frame, fut) = ensure_gpu_buffer_frame(&input, self.queue.clone()) + let (frame, fut) = ensure_gpu_buffer::(&input, self.queue.clone()) .context("Wrong input format for Calibrate")?; let sink_buffer = DeviceLocalBuffer::<[u8]>::array( self.device.clone(), - frame.interpretation.required_bytes() as DeviceSize, + frame.interp.required_bytes() as DeviceSize, BufferUsage { storage_buffer: true, storage_texel_buffer: true, @@ -131,8 +131,8 @@ impl ProcessingNode for Calibrate { )?; let push_constants = compute_shader::ty::PushConstantData { - width: frame.interpretation.width as _, - height: frame.interpretation.height as _, + width: frame.interp.width as _, + height: frame.interp.height as _, }; let layout = self.pipeline.layout().set_layouts()[0].clone(); @@ -166,8 +166,8 @@ impl ProcessingNode for Calibrate { .push_constants(self.pipeline.layout().clone(), 0, push_constants) .bind_pipeline_compute(self.pipeline.clone()) .dispatch([ - (frame.interpretation.width as u32 + 15) / 16, - (frame.interpretation.height as u32 + 31) / 32, + (frame.interp.width as u32 + 15) / 16, + (frame.interp.height as u32 + 31) / 32, 1, ])?; let command_buffer = builder.build()?; @@ -176,10 +176,7 @@ impl ProcessingNode for Calibrate { fut.then_execute(self.queue.clone(), command_buffer)?.then_signal_fence_and_flush()?; future.wait(None).unwrap(); - Ok(Payload::from(Frame { - interpretation: frame.interpretation.clone(), - storage: GpuBuffer::from(sink_buffer), - })) + Ok(Payload::from(Frame { interp: frame.interp, storage: GpuBuffer::from(sink_buffer) })) } fn get_caps(&self) -> Caps { self.input.get_caps() } diff --git a/src/nodes_gpu/color_voodoo.glsl b/src/nodes_gpu/color_voodoo.glsl index 0905d61..d11e8e8 100644 --- a/src/nodes_gpu/color_voodoo.glsl +++ b/src/nodes_gpu/color_voodoo.glsl @@ -1,32 +1,51 @@ +#version 450 +#extension GL_EXT_shader_explicit_arithmetic_types: enable +#extension GL_EXT_shader_explicit_arithmetic_types_int8: require + +layout(local_size_x = 32, local_size_y = 32, local_size_z = 1) in; + layout(push_constant) uniform PushConstantData { - dtype pedestal; - dtype s_gamma; - dtype v_gamma; + float pedestal; + float s_gamma; + float v_gamma; + uint width; + uint height; } params; +layout(set = 0, binding = 0) buffer readonly Source { uint8_t data[]; } source; +layout(set = 0, binding = 1) buffer writeonly Sink { uint8_t data[]; } sink; // stolen from: https://stackoverflow.com/questions/15095909/from-rgb-to-hsv-in-opengl-glsl -dtype3 rgb2hsv(dtype3 c) { - dtype4 K = dtype4(0.0, -1.0 / 3.0, 2.0 / 3.0, -1.0); - dtype4 p = mix(dtype4(c.bg, K.wz), dtype4(c.gb, K.xy), step(c.b, c.g)); - dtype4 q = mix(dtype4(p.xyw, c.r), dtype4(c.r, p.yzx), step(p.x, c.r)); - - dtype d = q.x - min(q.w, q.y); - dtype e = 1.0e-10; - return dtype3(abs(q.z + (q.w - q.y) / (6.0 * d + e)), d / (q.x + e), q.x); +vec3 rgb2hsv(vec3 c) { + vec4 K = vec4(0.0, -1.0 / 3.0, 2.0 / 3.0, -1.0); + vec4 p = mix(vec4(c.bg, K.wz), vec4(c.gb, K.xy), step(c.b, c.g)); + vec4 q = mix(vec4(p.xyw, c.r), vec4(c.r, p.yzx), step(p.x, c.r)); + + float d = q.x - min(q.w, q.y); + float e = 1.0e-10; + return vec3(abs(q.z + (q.w - q.y) / (6.0 * d + e)), d / (q.x + e), q.x); } -dtype3 hsv2rgb(dtype3 c) { - dtype4 K = dtype4(1.0, 2.0 / 3.0, 1.0 / 3.0, 3.0); - dtype3 p = abs(fract(c.xxx + K.xyz) * 6.0 - K.www); + +vec3 hsv2rgb(vec3 c) { + vec4 K = vec4(1.0, 2.0 / 3.0, 1.0 / 3.0, 3.0); + vec3 p = abs(fract(c.xxx + K.xyz) * 6.0 - K.www); return c.z * mix(K.xxx, clamp(p - K.xxx, 0.0, 1.0), c.y); } -dtype3 produce_pixel(uvec2 pos) { - dtype3 rgb = read_pixel(pos); - rgb = (rgb - params.pedestal) / (1 - params.pedestal); - dtype3 hsv = rgb2hsv(rgb); +void main() { + uvec2 pos = gl_GlobalInvocationID.xy; + if (pos.x >= params.width || pos.y >= params.height) return; + uint idx = 3 * (params.width * pos.y + pos.x); + uint8_t r = source.data[idx + 0]; + uint8_t g = source.data[idx + 1]; + uint8_t b = source.data[idx + 2]; + vec3 rgb = (vec3(r, g, b) - params.pedestal) / (256 - params.pedestal); + vec3 hsv = rgb2hsv(rgb); hsv.g = pow(hsv.g, params.s_gamma); hsv.b = pow(hsv.b, params.v_gamma); - rgb = hsv2rgb(hsv); - return rgb; + rgb = hsv2rgb(hsv) * 256; + + sink.data[idx + 0] = uint8_t(rgb.r); + sink.data[idx + 1] = uint8_t(rgb.g); + sink.data[idx + 2] = uint8_t(rgb.b); } diff --git a/src/nodes_gpu/color_voodoo.rs b/src/nodes_gpu/color_voodoo.rs index 3491fb1..39a4f89 100644 --- a/src/nodes_gpu/color_voodoo.rs +++ b/src/nodes_gpu/color_voodoo.rs @@ -1,104 +1,146 @@ -use crate::{ - nodes_gpu::base_gpu_node::{BindingValue, GpuNode}, - pipeline_processing::{ - frame::{ColorInterpretation, FrameInterpretation}, - node::NodeID, - parametrizable::prelude::*, - processing_context::ProcessingContext, - }, +use crate::pipeline_processing::{ + buffers::GpuBuffer, + frame::{Frame, FrameInterpretation, Rgb}, + gpu_util::ensure_gpu_buffer, + node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, + parametrizable::prelude::*, + payload::Payload, + processing_context::ProcessingContext, +}; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use std::sync::Arc; +use vulkano::{ + buffer::{BufferUsage, DeviceLocalBuffer}, + command_buffer::{AutoCommandBufferBuilder, CommandBufferUsage::OneTimeSubmit}, + descriptor_set::{persistent::PersistentDescriptorSet, WriteDescriptorSet}, + device::{Device, Queue}, + pipeline::{ComputePipeline, Pipeline, PipelineBindPoint}, + sync::GpuFuture, + DeviceSize, }; -use anyhow::{bail, Result}; -use std::collections::HashMap; +// generated by the macro +#[allow(clippy::needless_question_mark)] +mod compute_shader { + vulkano_shaders::shader! { + ty: "compute", + path: "src/nodes_gpu/color_voodoo.glsl" + } +} pub struct ColorVoodoo { - pedestal: f32, - s_gamma: f32, - v_gamma: f32, + device: Arc, + pipeline: Arc, + queue: Arc, + input: InputProcessingNode, + pedestal: u8, + s_gamma: f64, + v_gamma: f64, } impl Parameterizable for ColorVoodoo { fn describe_parameters() -> ParametersDescriptor { ParametersDescriptor::new() - .with("pedestal", WithDefault(FloatRange(0.0, 1.0), FloatRangeValue(0.0))) + .with("input", Mandatory(NodeInputParameter)) + .with("pedestal", WithDefault(U8(), IntRangeValue(8))) .with("s_gamma", WithDefault(FloatRange(0.0, 100.0), FloatRangeValue(1.0))) .with("v_gamma", WithDefault(FloatRange(0.0, 100.0), FloatRangeValue(1.0))) } fn from_parameters( mut parameters: Parameters, _is_input_to: &[NodeID], - _context: &ProcessingContext, + context: &ProcessingContext, ) -> Result where Self: Sized, { + let (device, queues) = context.require_vulkan()?; + let queue = queues.iter().find(|&q| q.family().supports_compute()).unwrap().clone(); + + let shader = compute_shader::load(device.clone()).unwrap(); + let pipeline = ComputePipeline::new( + device.clone(), + shader.entry_point("main").unwrap(), + &(), + None, + |_| {}, + ) + .unwrap(); + Ok(ColorVoodoo { - pedestal: parameters.take("pedestal")?, + device, + pipeline, + queue, + input: parameters.take("input")?, + pedestal: parameters.take::("pedestal")? as u8, s_gamma: parameters.take("s_gamma")?, v_gamma: parameters.take("v_gamma")?, }) } } -impl GpuNode for ColorVoodoo { - fn get_glsl(&self) -> String { include_str!("./color_voodoo.glsl").to_string() } +#[async_trait] +impl ProcessingNode for ColorVoodoo { + async fn pull(&self, request: Request) -> Result { + let input = self.input.pull(request).await?; - fn get_binding( - &self, - frame_interpretation: &FrameInterpretation, - ) -> Result> { - if frame_interpretation.color_interpretation != ColorInterpretation::Rgb { - bail!("color_voodo node only supports rgb images") - } + let (frame, fut) = ensure_gpu_buffer::(&input, self.queue.clone()) + .context("Wrong input format for ColorVoodoo")?; - Ok(HashMap::from([ - ("pedestal".to_string(), BindingValue::F32(self.pedestal)), - ("s_gamma".to_string(), BindingValue::F32(self.s_gamma)), - ("v_gamma".to_string(), BindingValue::F32(self.v_gamma)), - ])) - } -} + let sink_buffer = DeviceLocalBuffer::<[u8]>::array( + self.device.clone(), + frame.interp.required_bytes() as DeviceSize, + BufferUsage { storage_buffer: true, storage_texel_buffer: true, ..BufferUsage::none() }, + std::iter::once(self.queue.family()), + )?; + let push_constants = compute_shader::ty::PushConstantData { + pedestal: self.pedestal as f32, + s_gamma: self.s_gamma as f32, + v_gamma: self.v_gamma as f32, + width: frame.interp.width as _, + height: frame.interp.height as _, + }; -#[cfg(test)] -mod tests { - use super::ColorVoodoo; - use crate::{ - nodes_gpu::base_gpu_node::GpuNodeImpl, - nodes_util::null_source::NullFrameSource, - pipeline_processing::{ - frame::{ColorInterpretation, Compression, FrameInterpretation, SampleInterpretation}, - node::{InputProcessingNode, NodeID, ProcessingNode, Request}, - parametrizable::{prelude::NodeInputValue, Parameterizable, Parameters}, - processing_context::ProcessingContext, - }, - }; - use std::{collections::HashMap, sync::Arc}; + let layout = self.pipeline.layout().set_layouts()[0].clone(); + let set = PersistentDescriptorSet::new( + layout, + [ + WriteDescriptorSet::buffer(0, frame.storage.untyped()), + WriteDescriptorSet::buffer(1, sink_buffer.clone()), + ], + ) + .unwrap(); - #[test] - fn test_basic_functionality_color_voodo() { - let context = ProcessingContext::default(); + let mut builder = AutoCommandBufferBuilder::primary( + self.device.clone(), + self.queue.family(), + OneTimeSubmit, + ) + .unwrap(); + builder + .bind_descriptor_sets( + PipelineBindPoint::Compute, + self.pipeline.layout().clone(), + 0, + set, + ) + .push_constants(self.pipeline.layout().clone(), 0, push_constants) + .bind_pipeline_compute(self.pipeline.clone()) + .dispatch([ + (frame.interp.width as u32 + 31) / 32, + (frame.interp.height as u32 + 31) / 32, + 1, + ])?; + let command_buffer = builder.build()?; - let source = NodeInputValue(InputProcessingNode::new( - NodeID::default(), - Arc::new(NullFrameSource { - context: context.clone(), - interpretation: FrameInterpretation { - width: 1920, - height: 1080, - fps: Some(24.0), - color_interpretation: ColorInterpretation::Rgb, - sample_interpretation: SampleInterpretation::FP16, - compression: Compression::Uncompressed, - }, - }), - )); - let parameters = Parameters::new(HashMap::from([("input".to_string(), source)])) - .add_defaults(GpuNodeImpl::::describe_parameters()); - let dut = GpuNodeImpl::::from_parameters(parameters, &[], &context).unwrap(); + let future = + fut.then_execute(self.queue.clone(), command_buffer)?.then_signal_fence_and_flush()?; - for _ in 0..10 { - let _payload = pollster::block_on(dut.pull(Request::new(0, 0))).unwrap(); - } + future.wait(None).unwrap(); + Ok(Payload::from(Frame { interp: frame.interp, storage: GpuBuffer::from(sink_buffer) })) } + + fn get_caps(&self) -> Caps { self.input.get_caps() } } diff --git a/src/nodes_gpu/debayer.glsl b/src/nodes_gpu/debayer.glsl index cf02f22..ad8b539 100644 --- a/src/nodes_gpu/debayer.glsl +++ b/src/nodes_gpu/debayer.glsl @@ -1,4 +1,25 @@ -dtype3 produce_pixel(uvec2 pos) { +#version 450 +#extension GL_EXT_shader_explicit_arithmetic_types: enable +#extension GL_EXT_shader_explicit_arithmetic_types_int8: require + +layout(local_size_x = 32, local_size_y = 32, local_size_z = 1) in; + +layout(push_constant) uniform PushConstantData { + uint width; + uint height; + +// these are actual coordinates of the first red pixel (unlike everywhere else) + uint first_red_x; + uint first_red_y; +} params; + +layout(set = 0, binding = 0) buffer readonly Source { uint8_t data[]; } source; +layout(set = 0, binding = 1) buffer writeonly Sink { uint8_t data[]; } sink; + +void main() { + uvec2 pos = gl_GlobalInvocationID.xy; + if (pos.x >= params.width || pos.y >= params.height) return; + /* variables a-i are the neighbour pixels (we are e) a b c @@ -6,15 +27,15 @@ dtype3 produce_pixel(uvec2 pos) { g h i */ - dtype a = read_pixel(pos + uvec2(-1, -1)); - dtype b = read_pixel(pos + uvec2( 0, -1)); - dtype c = read_pixel(pos + uvec2(+1, -1)); - dtype d = read_pixel(pos + uvec2(-1, 0)); - dtype e = read_pixel(pos + uvec2( 0, 0)); - dtype f = read_pixel(pos + uvec2(+1, 0)); - dtype g = read_pixel(pos + uvec2(-1, +1)); - dtype h = read_pixel(pos + uvec2( 0, +1)); - dtype i = read_pixel(pos + uvec2(+1, +1)); + float a = float(source.data[(pos.x - 1) + (pos.y - 1) * params.width]); + float b = float(source.data[(pos.x ) + (pos.y - 1) * params.width]); + float c = float(source.data[(pos.x + 1) + (pos.y - 1) * params.width]); + float d = float(source.data[(pos.x - 1) + (pos.y ) * params.width]); + float e = float(source.data[(pos.x ) + (pos.y ) * params.width]); + float f = float(source.data[(pos.x + 1) + (pos.y ) * params.width]); + float g = float(source.data[(pos.x - 1) + (pos.y + 1) * params.width]); + float h = float(source.data[(pos.x ) + (pos.y + 1) * params.width]); + float i = float(source.data[(pos.x + 1) + (pos.y + 1) * params.width]); vec3 red_pixel = vec3( e, @@ -37,10 +58,10 @@ dtype3 produce_pixel(uvec2 pos) { (d + f) / 2. ); - float x_red = float((pos.x + uint(!CFA_RED_IN_FIRST_COL) + 1) % 2); - float x_red_not = float((pos.x + uint(!CFA_RED_IN_FIRST_COL)) % 2); - float y_red = float((pos.y + uint(!CFA_RED_IN_FIRST_ROW) + 1) % 2); - float y_red_not = float((pos.y + uint(!CFA_RED_IN_FIRST_ROW)) % 2); + float x_red = float((pos.x + params.first_red_x + 1) % 2); + float x_red_not = float((pos.x + params.first_red_x) % 2); + float y_red = float((pos.y + params.first_red_y + 1) % 2); + float y_red_not = float((pos.y + params.first_red_y) % 2); vec3 rgb = ( + red_pixel * x_red * y_red @@ -49,6 +70,8 @@ dtype3 produce_pixel(uvec2 pos) { + green_pixel_blue_row * x_red * y_red_not ); - return rgb; + sink.data[(pos.y * params.width + pos.x) * 3 + 0] = uint8_t(rgb.r); + sink.data[(pos.y * params.width + pos.x) * 3 + 1] = uint8_t(rgb.g); + sink.data[(pos.y * params.width + pos.x) * 3 + 2] = uint8_t(rgb.b); } diff --git a/src/nodes_gpu/debayer.rs b/src/nodes_gpu/debayer.rs index 2e49c9f..de5a660 100644 --- a/src/nodes_gpu/debayer.rs +++ b/src/nodes_gpu/debayer.rs @@ -1,85 +1,143 @@ -use crate::{ - nodes_gpu::base_gpu_node::{BindingValue, GpuNode}, - pipeline_processing::frame::{ColorInterpretation, FrameInterpretation, SampleInterpretation}, +use crate::pipeline_processing::{ + buffers::GpuBuffer, + frame::{Frame, FrameInterpretation, Raw, Rgb}, + gpu_util::ensure_gpu_buffer, + node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, + parametrizable::prelude::*, + payload::Payload, + processing_context::ProcessingContext, +}; +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use std::sync::Arc; +use vulkano::{ + buffer::{BufferUsage, DeviceLocalBuffer}, + command_buffer::{AutoCommandBufferBuilder, CommandBufferUsage::OneTimeSubmit}, + descriptor_set::{persistent::PersistentDescriptorSet, WriteDescriptorSet}, + device::{Device, Queue}, + pipeline::{ComputePipeline, Pipeline, PipelineBindPoint}, + sync::GpuFuture, + DeviceSize, }; -use anyhow::{bail, Result}; -use std::collections::HashMap; +// generated by the macro +#[allow(clippy::needless_question_mark)] +mod compute_shader { + vulkano_shaders::shader! { + ty: "compute", + path: "src/nodes_gpu/debayer.glsl" + } +} -#[derive(Default)] -pub struct Debayer {} -impl GpuNode for Debayer { - fn get_glsl(&self) -> String { include_str!("./debayer.glsl").to_string() } +pub struct Debayer { + device: Arc, + pipeline: Arc, + queue: Arc, + input: InputProcessingNode, +} - fn get_binding( - &self, - frame_interpretation: &FrameInterpretation, - ) -> Result> { - match frame_interpretation.color_interpretation { - ColorInterpretation::Bayer(cfa) => Ok(HashMap::from([ - ("cfa.red_in_first_col".to_string(), BindingValue::U32(cfa.red_in_first_col as _)), - ("cfa.red_in_first_row".to_string(), BindingValue::U32(cfa.red_in_first_row as _)), - ])), - unsupported => bail!("expected bayer input found {unsupported:?}"), - } +impl Parameterizable for Debayer { + fn describe_parameters() -> ParametersDescriptor { + ParametersDescriptor::default().with("input", Mandatory(NodeInputParameter)) } + fn from_parameters( + mut parameters: Parameters, + _is_input_to: &[NodeID], + context: &ProcessingContext, + ) -> Result + where + Self: Sized, + { + let (device, queues) = context.require_vulkan()?; + let queue = queues.iter().find(|&q| q.family().supports_compute()).unwrap().clone(); - fn get_interpretation(&self, frame_interpretation: FrameInterpretation) -> FrameInterpretation { - FrameInterpretation { - color_interpretation: ColorInterpretation::Rgb, - sample_interpretation: SampleInterpretation::UInt(8), - ..frame_interpretation - } + let shader = compute_shader::load(device.clone()).unwrap(); + let pipeline = ComputePipeline::new( + device.clone(), + shader.entry_point("main").unwrap(), + &(), + None, + |_| {}, + ) + .unwrap(); + + Ok(Debayer { device, pipeline, queue, input: parameters.take("input")? }) } } -#[cfg(test)] -mod tests { - use super::Debayer; - use crate::{ - nodes_gpu::base_gpu_node::GpuNodeImpl, - nodes_util::null_source::NullFrameSource, - pipeline_processing::{ - frame::{ - CfaDescriptor, - ColorInterpretation, - Compression, - FrameInterpretation, - SampleInterpretation, +#[async_trait] +impl ProcessingNode for Debayer { + async fn pull(&self, request: Request) -> Result { + let input = self.input.pull(request).await?; + + let (frame, fut) = ensure_gpu_buffer::(&input, self.queue.clone()) + .context("Wrong input format for Debayer")?; + + if frame.interp.bit_depth != 8 { + return Err(anyhow!( + "A frame with bit_depth=8 is required. Convert the bit depth of the frame!" + )); + } + + let interp = + Rgb { width: frame.interp.width, height: frame.interp.height, fps: frame.interp.fps }; + let sink_buffer = DeviceLocalBuffer::<[u8]>::array( + self.device.clone(), + interp.required_bytes() as DeviceSize, + BufferUsage { + storage_buffer: true, + storage_texel_buffer: true, + transfer_src: true, + ..BufferUsage::none() }, - node::{InputProcessingNode, NodeID, ProcessingNode, Request}, - parametrizable::{prelude::NodeInputValue, Parameterizable, Parameters}, - processing_context::ProcessingContext, - }, - }; - use std::{collections::HashMap, sync::Arc}; + std::iter::once(self.queue.family()), + )?; - #[test] - fn test_basic_functionality_debayer() { - let context = ProcessingContext::default(); + let push_constants = compute_shader::ty::PushConstantData { + width: frame.interp.width as u32, + height: frame.interp.height as u32, + first_red_x: (!frame.interp.cfa.red_in_first_col) as u32, + first_red_y: (!frame.interp.cfa.red_in_first_row) as u32, + }; - let source = NodeInputValue(InputProcessingNode::new( - NodeID::default(), - Arc::new(NullFrameSource { - context: context.clone(), - interpretation: FrameInterpretation { - width: 1920, - height: 1080, - fps: Some(24.0), - color_interpretation: ColorInterpretation::Bayer(CfaDescriptor { - red_in_first_col: false, - red_in_first_row: false, - }), - sample_interpretation: SampleInterpretation::FP16, - compression: Compression::Uncompressed, - }, - }), - )); - let parameters = Parameters::new(HashMap::from([("input".to_string(), source)])); - let dut = GpuNodeImpl::::from_parameters(parameters, &[], &context).unwrap(); + let layout = self.pipeline.layout().set_layouts()[0].clone(); + let set = PersistentDescriptorSet::new( + layout, + [ + WriteDescriptorSet::buffer(0, frame.storage.untyped()), + WriteDescriptorSet::buffer(1, sink_buffer.clone()), + ], + ) + .unwrap(); - for _ in 0..10 { - let _payload = pollster::block_on(dut.pull(Request::new(0, 0))).unwrap(); - } + let mut builder = AutoCommandBufferBuilder::primary( + self.device.clone(), + self.queue.family(), + OneTimeSubmit, + ) + .unwrap(); + builder + .bind_descriptor_sets( + PipelineBindPoint::Compute, + self.pipeline.layout().clone(), + 0, + set, + ) + .push_constants(self.pipeline.layout().clone(), 0, push_constants) + .bind_pipeline_compute(self.pipeline.clone()) + .dispatch([ + (frame.interp.width + 31) as u32 / 32, + (frame.interp.height as u32 + 31) / 32, + 1, + ])?; + let command_buffer = builder.build()?; + + let future = + fut.then_execute(self.queue.clone(), command_buffer)?.then_signal_fence_and_flush()?; + + future.wait(None).unwrap(); + Ok(Payload::from(Frame { interp, storage: GpuBuffer::from(sink_buffer) })) } + + fn get_caps(&self) -> Caps { self.input.get_caps() } } diff --git a/src/nodes_gpu/display.rs b/src/nodes_gpu/display.rs index a0e24ca..7f15fcd 100644 --- a/src/nodes_gpu/display.rs +++ b/src/nodes_gpu/display.rs @@ -1,6 +1,6 @@ use crate::pipeline_processing::{ frame::Rgb, - gpu_util::ensure_gpu_buffer_frame, + gpu_util::ensure_gpu_buffer, node::{InputProcessingNode, NodeID, ProgressUpdate, SinkNode}, parametrizable::prelude::*, processing_context::ProcessingContext, @@ -145,7 +145,7 @@ impl Parameterizable for Display { .with("mailbox", Optional(BoolParameter)) .with("live", Optional(BoolParameter)) .with("loop", Optional(BoolParameter)) - .with("priority", WithDefault(U8(), ParameterValue::IntRangeValue(0))) + .with("priority", Optional(U8())) .with("fullscreen", Optional(BoolParameter)) } @@ -155,10 +155,10 @@ impl Parameterizable for Display { _context: &ProcessingContext, ) -> Result { Ok(Self { - mailbox: parameters.has("mailbox"), - live: parameters.has("live"), - do_loop: parameters.has("loop"), - fullscreen: parameters.has("fullscreen"), + mailbox: parameters.take("mailbox")?, + live: parameters.take("live")?, + do_loop: parameters.take("loop")?, + fullscreen: parameters.take("fullscreen")?, input: parameters.take("input")?, priority: parameters.take("priority")?, }) @@ -316,15 +316,13 @@ impl SinkNode for Display { *control_flow = ControlFlow::Exit } Ok(ref frame) => { - let (frame, fut) = - ensure_gpu_buffer_frame::(frame, queue.clone()) - .context("Wrong input format for Display") - .unwrap(); - frame_width = frame.interpretation.width as _; - frame_height = frame.interpretation.height as _; - - next_frame_time += - Duration::from_secs_f64(1.0 / frame.interpretation.fps); + let (frame, fut) = ensure_gpu_buffer::(frame, queue.clone()) + .context("Wrong input format for Display") + .unwrap(); + frame_width = frame.interp.width as _; + frame_height = frame.interp.height as _; + + next_frame_time += Duration::from_secs_f64(1.0 / frame.interp.fps); source_buffer = Some(frame); source_future = Some(fut); } diff --git a/src/nodes_gpu/histogram.rs b/src/nodes_gpu/histogram.rs index 5d0aaf6..2d59e68 100644 --- a/src/nodes_gpu/histogram.rs +++ b/src/nodes_gpu/histogram.rs @@ -1,7 +1,7 @@ use crate::pipeline_processing::{ buffers::GpuBuffer, - frame::{Frame, FrameInterpretation, SampleInterpretation}, - gpu_util::ensure_gpu_buffer_frame, + frame::{Frame, Raw}, + gpu_util::ensure_gpu_buffer, node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, @@ -70,13 +70,12 @@ impl ProcessingNode for Histogram { async fn pull(&self, request: Request) -> Result { let input = self.input.pull(request).await?; - let (frame, fut) = ensure_gpu_buffer_frame(&input, self.queue.clone()) + let (frame, fut) = ensure_gpu_buffer::(&input, self.queue.clone()) .context("Wrong input format for Histogram")?; - let sink_buffer = DeviceLocalBuffer::<[u8]>::array( self.device.clone(), - (1 << 8) * 4, // actually uint + (1 << frame.interp.bit_depth) * 4, // actually uint BufferUsage { storage_buffer: true, storage_texel_buffer: true, @@ -89,8 +88,8 @@ impl ProcessingNode for Histogram { )?; let push_constants = compute_shader::ty::PushConstantData { - width: frame.interpretation.width as _, - height: frame.interpretation.height as _, + width: frame.interp.width as _, + height: frame.interp.height as _, }; let layout = self.pipeline.layout().set_layouts()[0].clone(); @@ -123,8 +122,8 @@ impl ProcessingNode for Histogram { .push_constants(self.pipeline.layout().clone(), 0, push_constants) .bind_pipeline_compute(self.pipeline.clone()) .dispatch([ - (frame.interpretation.width as u32 + 15) / 16, - (frame.interpretation.height as u32 + 31) / 32, + (frame.interp.width as u32 + 15) / 16, + (frame.interp.height as u32 + 31) / 32, 1, ])?; let command_buffer = builder.build()?; @@ -134,11 +133,12 @@ impl ProcessingNode for Histogram { future.wait(None).unwrap(); Ok(Payload::from(Frame { - interpretation: FrameInterpretation { + interp: Raw { width: 4096, height: 1, - sample_interpretation: SampleInterpretation::FP32, - ..frame.interpretation.clone() + bit_depth: 32, + cfa: frame.interp.cfa, + fps: frame.interp.fps, }, storage: GpuBuffer::from(sink_buffer), })) diff --git a/src/nodes_gpu/lut_3d.glsl b/src/nodes_gpu/lut_3d.glsl new file mode 100644 index 0000000..034d151 --- /dev/null +++ b/src/nodes_gpu/lut_3d.glsl @@ -0,0 +1,30 @@ +#version 450 +#extension GL_EXT_shader_explicit_arithmetic_types: enable +#extension GL_EXT_shader_explicit_arithmetic_types_int8: require + +layout(local_size_x = 32, local_size_y = 32, local_size_z = 1) in; + +layout(push_constant) uniform PushConstantData { + uint width; + uint height; +} params; + +layout(set = 0, binding = 0) buffer readonly Source { uint8_t data[]; } source; +layout(set = 0, binding = 1) buffer writeonly Sink { uint8_t data[]; } sink; +layout(set = 0, binding = 2) uniform sampler3D lut_sampler; + +void main() { + uvec2 pos = gl_GlobalInvocationID.xy; + if (pos.x >= params.width || pos.y >= params.height) return; + uint idx = 3 * (params.width * pos.y + pos.x); + uint8_t r = source.data[idx + 0]; + uint8_t g = source.data[idx + 1]; + uint8_t b = source.data[idx + 2]; + + vec3 orig_rgb = vec3(r, g, b) / 255.0; + vec4 rgba = texture(lut_sampler, orig_rgb.rgb) * 255.0; + + sink.data[idx + 0] = uint8_t(rgba.r); + sink.data[idx + 1] = uint8_t(rgba.g); + sink.data[idx + 2] = uint8_t(rgba.b); +} diff --git a/src/nodes_gpu/lut_3d.rs b/src/nodes_gpu/lut_3d.rs index e4f8ec0..676b0ea 100644 --- a/src/nodes_gpu/lut_3d.rs +++ b/src/nodes_gpu/lut_3d.rs @@ -1,28 +1,47 @@ -use crate::{ - nodes_gpu::base_gpu_node::{BindingValue, GpuNode}, - pipeline_processing::{ - frame::{ColorInterpretation, FrameInterpretation}, - node::NodeID, - parametrizable::prelude::*, - processing_context::ProcessingContext, - }, +use crate::pipeline_processing::{ + buffers::GpuBuffer, + frame::{Frame, FrameInterpretation, Rgb}, + gpu_util::ensure_gpu_buffer, + node::{Caps, InputProcessingNode, NodeID, ProcessingNode, Request}, + parametrizable::prelude::*, + payload::Payload, + processing_context::ProcessingContext, }; -use anyhow::{anyhow, bail, Result}; -use indoc::indoc; +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; use std::{ - collections::HashMap, fs::File, io::{BufReader, Read}, sync::Arc, }; use vulkano::{ - device::Queue, - image::{view::ImageView, ImmutableImage}, + buffer::{BufferUsage, DeviceLocalBuffer}, + command_buffer::{AutoCommandBufferBuilder, CommandBufferUsage::OneTimeSubmit}, + descriptor_set::{persistent::PersistentDescriptorSet, WriteDescriptorSet}, + device::{Device, Queue}, + image::{view::ImageView, ImageViewAbstract, ImmutableImage}, + pipeline::{ComputePipeline, Pipeline, PipelineBindPoint}, sampler::{Filter, Sampler, SamplerAddressMode, SamplerCreateInfo}, + sync::GpuFuture, + DeviceSize, }; +// generated by the macro +#[allow(clippy::needless_question_mark)] +mod compute_shader { + vulkano_shaders::shader! { + ty: "compute", + path: "src/nodes_gpu/lut_3d.glsl" + } +} + pub struct Lut3d { - sampler: BindingValue, + device: Arc, + pipeline: Arc, + queue: Arc, + input: InputProcessingNode, + lut_image_view: Arc, + lut_sampler: Arc, } impl Parameterizable for Lut3d { @@ -43,6 +62,16 @@ impl Parameterizable for Lut3d { let (device, queues) = context.require_vulkan()?; let queue = queues.iter().find(|&q| q.family().supports_compute()).unwrap().clone(); + let shader = compute_shader::load(device.clone()).unwrap(); + let pipeline = ComputePipeline::new( + device.clone(), + shader.entry_point("main").unwrap(), + &(), + None, + |_| {}, + ) + .unwrap(); + let lut_image = read_lut_texture_from_cube_file(parameters.take("file")?, queue.clone())?; let lut_sampler = Sampler::new( device.clone(), @@ -59,10 +88,14 @@ impl Parameterizable for Lut3d { ) .unwrap(); - let sampler = - BindingValue::Sampler((ImageView::new_default(lut_image).unwrap(), lut_sampler)); - - Ok(Lut3d { sampler }) + Ok(Lut3d { + device, + pipeline, + queue, + input: parameters.take("input")?, + lut_image_view: ImageView::new_default(lut_image).unwrap(), + lut_sampler, + }) } } @@ -159,79 +192,74 @@ fn read_lut_texture_from_cube_file(path: String, queue: Arc) -> Result String { - indoc!( - " - layout(...) uniform sampler3D lut_sampler; +#[async_trait] +impl ProcessingNode for Lut3d { + async fn pull(&self, request: Request) -> Result { + let input = self.input.pull(request).await?; - dtype3 produce_pixel(uvec2 pos) { - return dtype3(texture(lut_sampler, read_pixel(pos))); - } - " - ) - .to_string() - } + let (frame, fut) = ensure_gpu_buffer::(&input, self.queue.clone()) + .context("Wrong input forma for Lut3d")?; - fn get_binding( - &self, - frame_interpretation: &FrameInterpretation, - ) -> Result> { - if frame_interpretation.color_interpretation != ColorInterpretation::Rgb { - bail!("Lut3d node only supports rgb images") - } + let sink_buffer = DeviceLocalBuffer::<[u8]>::array( + self.device.clone(), + frame.interp.required_bytes() as DeviceSize, + BufferUsage { + storage_buffer: true, + storage_texel_buffer: true, + transfer_src: true, + ..BufferUsage::none() + }, + std::iter::once(self.queue.family()), + )?; - Ok(HashMap::from([("lut_sampler".to_string(), self.sampler.clone())])) - } -} + let push_constants = compute_shader::ty::PushConstantData { + width: frame.interp.width as _, + height: frame.interp.height as _, + }; + let layout = self.pipeline.layout().set_layouts()[0].clone(); + let set = PersistentDescriptorSet::new( + layout, + [ + WriteDescriptorSet::buffer(0, frame.storage.untyped()), + WriteDescriptorSet::buffer(1, sink_buffer.clone()), + WriteDescriptorSet::image_view_sampler( + 2, + self.lut_image_view.clone(), + self.lut_sampler.clone(), + ), + ], + ) + .unwrap(); -#[cfg(test)] -mod tests { - use super::Lut3d; - use crate::{ - nodes_gpu::base_gpu_node::GpuNodeImpl, - nodes_util::null_source::NullFrameSource, - pipeline_processing::{ - frame::{ColorInterpretation, Compression, FrameInterpretation, SampleInterpretation}, - node::{InputProcessingNode, NodeID, ProcessingNode, Request}, - parametrizable::{ - prelude::{NodeInputValue, StringValue}, - Parameterizable, - Parameters, - }, - processing_context::ProcessingContext, - }, - }; - use std::{collections::HashMap, sync::Arc}; - - #[test] - fn test_basic_functionality_lut3d() { - let context = ProcessingContext::default(); - - let source = NodeInputValue(InputProcessingNode::new( - NodeID::default(), - Arc::new(NullFrameSource { - context: context.clone(), - interpretation: FrameInterpretation { - width: 1920, - height: 1080, - fps: Some(24.0), - color_interpretation: ColorInterpretation::Rgb, - sample_interpretation: SampleInterpretation::FP16, - compression: Compression::Uncompressed, - }, - }), - )); - let parameters = Parameters::new(HashMap::from([ - ("input".to_string(), source), - ("file".to_string(), StringValue("/Users/anuejn/Library/Containers/com.blackmagic-design.DaVinciResolveLite/Data/Library/Application Support/LUT/Film Looks/Rec709 Kodak 2383 D60.cube".to_string())), - ])) - .add_defaults(GpuNodeImpl::::describe_parameters()); - let dut = GpuNodeImpl::::from_parameters(parameters, &[], &context).unwrap(); - - for _ in 0..10 { - let _payload = pollster::block_on(dut.pull(Request::new(0, 0))).unwrap(); - } + let mut builder = AutoCommandBufferBuilder::primary( + self.device.clone(), + self.queue.family(), + OneTimeSubmit, + ) + .unwrap(); + builder + .bind_descriptor_sets( + PipelineBindPoint::Compute, + self.pipeline.layout().clone(), + 0, + set, + ) + .push_constants(self.pipeline.layout().clone(), 0, push_constants) + .bind_pipeline_compute(self.pipeline.clone()) + .dispatch([ + (frame.interp.width as u32 + 31) / 32, + (frame.interp.height as u32 + 31) / 32, + 1, + ])?; + let command_buffer = builder.build()?; + + let future = + fut.then_execute(self.queue.clone(), command_buffer)?.then_signal_fence_and_flush()?; + + future.wait(None).unwrap(); + Ok(Payload::from(Frame { interp: frame.interp, storage: GpuBuffer::from(sink_buffer) })) } + + fn get_caps(&self) -> Caps { self.input.get_caps() } } diff --git a/src/nodes_gpu/mod.rs b/src/nodes_gpu/mod.rs index 7ca3705..e1638d1 100644 --- a/src/nodes_gpu/mod.rs +++ b/src/nodes_gpu/mod.rs @@ -1,3 +1,4 @@ +pub mod bitdepth_convert; pub mod calibrate; pub mod color_voodoo; pub mod debayer; @@ -8,8 +9,6 @@ pub mod lut_3d; #[cfg(target_os = "linux")] pub mod display; -pub mod base_gpu_node; pub mod histogram; #[cfg(target_os = "linux")] pub mod plot; -mod shader_util; diff --git a/src/nodes_gpu/plot.rs b/src/nodes_gpu/plot.rs index 0105938..3f69c5e 100644 --- a/src/nodes_gpu/plot.rs +++ b/src/nodes_gpu/plot.rs @@ -1,6 +1,6 @@ use crate::pipeline_processing::{ - frame::Bayer, - gpu_util::ensure_gpu_buffer_frame, + frame::Raw, + gpu_util::ensure_gpu_buffer, node::{InputProcessingNode, NodeID, ProgressUpdate, SinkNode}, parametrizable::prelude::*, processing_context::ProcessingContext, @@ -116,7 +116,7 @@ impl Parameterizable for Plot { .with("live", Optional(BoolParameter)) .with("fullscreen", Optional(BoolParameter)) .with("input", Mandatory(NodeInputParameter)) - .with("priority", WithDefault(U8(), ParameterValue::IntRangeValue(0))) + .with("priority", Optional(U8())) } fn from_parameters( @@ -125,11 +125,11 @@ impl Parameterizable for Plot { _context: &ProcessingContext, ) -> Result { Ok(Self { - mailbox: parameters.has("mailbox"), - live: parameters.has("live"), - fullscreen: parameters.has("fullscreen"), - input: parameters.take("input"), - priority: parameters.take("priority"), + mailbox: parameters.take("mailbox")?, + live: parameters.take("live")?, + fullscreen: parameters.take("fullscreen")?, + input: parameters.take("input")?, + priority: parameters.take("priority")?, }) } } @@ -295,10 +295,10 @@ impl SinkNode for Plot { } Ok(ref frame) => { let cpu_frame = context - .ensure_cpu_buffer_frame::(frame) + .ensure_cpu_buffer::(frame) .context("Wrong input format for Plot") .unwrap(); - let count = cpu_frame.interpretation.width as _; + let count = cpu_frame.interp.width as _; cpu_frame.storage.as_slice(|frame| { let frame = bytemuck::cast_slice::(frame); @@ -319,12 +319,10 @@ impl SinkNode for Plot { point_count = end - start; }); - let (frame, fut) = - ensure_gpu_buffer_frame::(frame, queue.clone()) - .context("Wrong input format for Plot") - .unwrap(); - next_frame_time += - Duration::from_secs_f64(1.0 / frame.interpretation.fps); + let (frame, fut) = ensure_gpu_buffer::(frame, queue.clone()) + .context("Wrong input format for Plot") + .unwrap(); + next_frame_time += Duration::from_secs_f64(1.0 / frame.interp.fps); source_buffer = Some(frame); source_future = Some(fut); diff --git a/src/nodes_gpu/shader_util.rs b/src/nodes_gpu/shader_util.rs deleted file mode 100644 index 03ba48d..0000000 --- a/src/nodes_gpu/shader_util.rs +++ /dev/null @@ -1,279 +0,0 @@ -use crate::pipeline_processing::frame::{ - ColorInterpretation, - FrameInterpretation, - SampleInterpretation, -}; -use anyhow::{bail, Result}; -use indoc::{formatdoc, indoc}; -use regex::Regex; -use shaderc::CompilationArtifact; - - -pub fn compile_shader(shader_code: &str) -> Result { - let compiler = shaderc::Compiler::new().unwrap(); - let mut options = shaderc::CompileOptions::new().unwrap(); - options.add_macro_definition("dtype", Some("float")); - options.add_macro_definition("dtype2", Some("vec2")); - options.add_macro_definition("dtype3", Some("vec3")); - options.add_macro_definition("dtype4", Some("vec4")); - let spirv = compiler.compile_into_spirv( - &shader_code, - shaderc::ShaderKind::Compute, - "shader.glsl", - "main", - Some(&options), - )?; - Ok(spirv) -} - -pub fn generate_single_node_shader( - node_shader: String, - input_interpretation: FrameInterpretation, - output_interpretation: FrameInterpretation, -) -> Result { - let mut shader_code = String::new(); - shader_code.push_str(indoc!( - " - #version 450 - #extension GL_EXT_shader_explicit_arithmetic_types: enable - #extension GL_EXT_shader_explicit_arithmetic_types_int8: require - #extension GL_EXT_shader_explicit_arithmetic_types_float16: require - - layout(local_size_x = 256, local_size_y = 4, local_size_z = 1) in; - " - )); - shader_code.push_str(read_sample_function(input_interpretation.sample_interpretation)?); - shader_code.push_str(write_sample_function(output_interpretation.sample_interpretation)?); - shader_code.push_str(&formatdoc!( - " - #define IN_WIDTH {} - #define IN_HEIGHT {} - #define WIDTH {} - #define HEIGHT {} - ", - input_interpretation.width, - input_interpretation.height, - output_interpretation.width, - output_interpretation.height - )); - shader_code.push_str(&read_pixel_function(input_interpretation.color_interpretation)?); - shader_code.push_str(&write_pixel_function(output_interpretation.color_interpretation)?); - shader_code.push_str(&node_shader); - shader_code.push_str(indoc!( - " - void main() { - uvec2 pos = gl_GlobalInvocationID.xy; - if (pos.x >= WIDTH || pos.y >= HEIGHT) return; - write_pixel(pos, produce_pixel(pos)); - } - " - )); - - let shader_code = assign_set_and_binding(&shader_code); - - Ok(shader_code) -} - -/// replaces the layout(...) placeholder with consecutive layout(set = 0, -/// binding = n) strings -fn assign_set_and_binding(input: &str) -> String { - let re = Regex::new(r"layout\(\.\.\.\)").unwrap(); - let mut str = String::new(); - let mut non_matches = re.split(input); - str += non_matches.next().unwrap(); - for (n, part) in non_matches.enumerate() { - str += &format!("layout(set = 0, binding = {n}){part}"); - } - return str; -} - -fn read_sample_function(si: SampleInterpretation) -> Result<&'static str> { - match si { - SampleInterpretation::UInt(bits) => match bits { - 8 => Ok(indoc!( - " - layout(...) buffer readonly Source { uint8_t data[]; } source; - - dtype read_sample(uint i) { - return dtype(source.data[i]) / 255.0; - } - " - )), - 12 => Ok(indoc!( - " - layout(...) buffer readonly Source { uint8_t data[]; } source; - - - dtype read_sample(uint i) { - uint source_idx = i / 2 * 3; - - uint v; - if (i % 2 == 0) { - uint8_t a = source.data[source_idx + 0]; - uint8_t b = source.data[source_idx + 1]; - v = (a << 4) | (b & 0xf0); - } else { - uint8_t b = source.data[source_idx + 1]; - uint8_t c = source.data[source_idx + 2]; - v = ((b & 0x0f) << 8) | c; - } - - return dtype(v) / 4095.0; - } - " - )), - 16 => Ok(indoc!( - " - layout(...) buffer readonly Source { uint16_t data[]; } source; - - dtype read_sample(uint i) { - return dtype(source.data[i]) / 65535.0; - } - " - )), - _ => bail!("bit depth {bits} is not implemented for input :("), - }, - SampleInterpretation::FP16 => Ok(indoc!( - " - layout(...) buffer readonly Source { float16_t data[]; } source; - - dtype read_sample(uint i) { - return dtype(source.data[i]); - } - " - )), - SampleInterpretation::FP32 => Ok(indoc!( - " - layout(...) buffer readonly Source { float data[]; } source; - - dtype read_sample(uint i) { - return dtype(source.data[i]); - } - " - )), - } -} -fn write_sample_function(si: SampleInterpretation) -> Result<&'static str> { - match si { - SampleInterpretation::UInt(bits) => match bits { - 8 => Ok(indoc!( - " - layout(...) buffer writeonly Sink { uint8_t data[]; } sink; - - void write_sample(uint i, dtype v) { - sink.data[i] = uint8_t(v * 255.0); - } - " - )), - 16 => Ok(indoc!( - " - layout(...) buffer writeonly Sink { uint16_t data[]; } sink; - - void write_sample(uint i, dtype v) { - sink.data[i] = uint16_t(v * 65535.0); - } - " - )), - _ => bail!("bit depth {bits} is not implemented for output :("), - }, - SampleInterpretation::FP16 => Ok(indoc!( - " - layout(...) buffer writeonly Sink { float16_t data[]; } sink; - - void write_sample(uint i, dtype v) { - sink.data[i] = float16_t(v); - } - " - )), - SampleInterpretation::FP32 => Ok(indoc!( - " - layout(...) buffer writeonly Sink { float data[]; } sink; - - void write_sample(uint i, dtype v) { - sink.data[i] = float(v); - } - " - )), - } -} - -fn read_pixel_function(ci: ColorInterpretation) -> Result { - match ci { - ColorInterpretation::Bayer(cfa) => Ok(formatdoc!( - " - #define CFA_RED_IN_FIRST_ROW {} - #define CFA_RED_IN_FIRST_COL {} - - dtype read_pixel(uvec2 pos) {{ - return read_sample(pos.y * IN_WIDTH + pos.x); - }} - ", - cfa.red_in_first_row, - cfa.red_in_first_col - )), - ColorInterpretation::Rgb => Ok(indoc!( - " - dtype3 read_pixel(uvec2 pos) { - uint offset = pos.y * IN_WIDTH * 3 + pos.x * 3; - dtype r = read_sample(offset + 0); - dtype g = read_sample(offset + 1); - dtype b = read_sample(offset + 2); - return dtype3(r, g, b); - } - " - ) - .to_string()), - ColorInterpretation::Rgba => Ok(indoc!( - " - dtype4 read_pixel(uvec2 pos) { - uint offset = pos.y * IN_WIDTH * 4 + pos.x * 4; - dtype r = read_sample(offset + 0); - dtype g = read_sample(offset + 1); - dtype b = read_sample(offset + 2); - dtype a = read_sample(offset + 3); - return dtype3(r, g, b, a); - } - " - ) - .to_string()), - } -} -fn write_pixel_function(ci: ColorInterpretation) -> Result { - match ci { - ColorInterpretation::Bayer(cfa) => Ok(formatdoc!( - " - #define OUT_CFA_RED_IN_FIRST_ROW {} - #define OUT_CFA_RED_IN_FIRST_COL {} - - void write_pixel(uvec2 pos, dtype v) {{ - write_sample(pos.y * WIDTH + pos.x, v); - }} - ", - cfa.red_in_first_row, - cfa.red_in_first_col - )), - ColorInterpretation::Rgb => Ok(indoc!( - " - void write_pixel(uvec2 pos, dtype3 v) { - uint offset = pos.y * WIDTH * 3 + pos.x * 3; - write_sample(offset + 0, v.r); - write_sample(offset + 1, v.g); - write_sample(offset + 2, v.b); - } - " - ) - .to_string()), - ColorInterpretation::Rgba => Ok(indoc!( - " - void write_pixel(uvec2 pos, dtype4 v) { - uint offset = pos.y * WIDTH * 4 + pos.x * 4; - write_sample(offset + 0, v.r); - write_sample(offset + 1, v.g); - write_sample(offset + 2, v.b); - write_sample(offset + 3, v.a); - } - " - ) - .to_string()), - } -} diff --git a/src/nodes_io/frameserver_cinema_dng.rs b/src/nodes_io/frameserver_cinema_dng.rs index 82a10c9..ee527d8 100644 --- a/src/nodes_io/frameserver_cinema_dng.rs +++ b/src/nodes_io/frameserver_cinema_dng.rs @@ -1,10 +1,8 @@ -use crate::{ - nodes_io::writer_cinema_dng::frame_to_dng_ifd, - pipeline_processing::{ - node::{InputProcessingNode, NodeID, ProgressUpdate, Request, SinkNode}, - parametrizable::prelude::*, - processing_context::ProcessingContext, - }, +use crate::pipeline_processing::{ + frame::Raw, + node::{InputProcessingNode, NodeID, ProgressUpdate, Request, SinkNode}, + parametrizable::prelude::*, + processing_context::ProcessingContext, }; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -26,7 +24,14 @@ use dav_server::{ DavHandler, }; use derivative::Derivative; -use dng::{ifd::Ifd, yaml::IfdYamlParser, DngWriter, FileType}; +use dng::{ + ifd::{Ifd, IfdValue}, + tags, + tags::IfdType, + yaml::IfdYamlParser, + DngWriter, + FileType, +}; use futures::{future, FutureExt}; use hyper::{ body::{Buf, Bytes}, @@ -62,7 +67,7 @@ impl Parameterizable for CinemaDngFrameserver { fn describe_parameters() -> ParametersDescriptor { ParametersDescriptor::new() .with("input", Mandatory(NodeInputParameter)) - .with("priority", WithDefault(U8(), ParameterValue::IntRangeValue(0))) + .with("priority", Optional(U8())) .with("host", WithDefault(StringParameter, StringValue("127.0.0.1".to_string()))) .with("port", Optional(IntRange(0, u16::MAX as i64))) .with("dcp-yaml", Optional(StringParameter)) @@ -76,26 +81,22 @@ impl Parameterizable for CinemaDngFrameserver { where Self: Sized, { - let port = if let Some(port) = parameters.take_option::("port")? { - port as u16 - } else { - portpicker::pick_unused_port().unwrap() - }; - + let port = parameters.take::("port")?; + let port = if port == 0 { portpicker::pick_unused_port().unwrap() } else { port as u16 }; let host = parameters.take::("host")?; let address = SocketAddr::from((IpAddr::from_str(&host)?, port)); let mut base_ifd = IfdYamlParser::default().parse_from_str(include_str!("./base_ifd.yml"))?; - let dcp_ifd = if let Some(path) = parameters.take_option::("dcp-yaml")? { - let path = PathBuf::from_str(&path)?; + let path_string = parameters.take::("dcp-yaml").unwrap_or("".to_string()); + let dcp_ifd = if path_string.is_empty() { + IfdYamlParser::default().parse_from_str(include_str!("./default_dcp.yml"))? + } else { + let path = PathBuf::from_str(&path_string)?; let data = fs::read_to_string(path.clone()).context("couldnt read dcp-yaml file")?; IfdYamlParser::new(path).parse_from_str(&data).context("couldnt parse dcp-yaml file")? - } else { - IfdYamlParser::default().parse_from_str(include_str!("./default_dcp.yml"))? }; - base_ifd.insert_from_other(dcp_ifd); Ok(Self { @@ -128,10 +129,36 @@ impl SinkNode for CinemaDngFrameserver { let payload = input.pull(Request::new(priority, i)).await?; let frame = context - .ensure_cpu_buffer_frame(&payload) - .context("Wrong input format for CinemaDng")?; + .ensure_cpu_buffer::(&payload) + .context("Wrong input format for CinemaDngWriter")?; + + + let mut ifd = Ifd::new(IfdType::Ifd); + ifd.insert_from_other(base_ifd.clone()); + + ifd.insert(tags::ifd::ImageWidth, frame.interp.width as u32); + ifd.insert(tags::ifd::ImageLength, frame.interp.height as u32); + ifd.insert(tags::ifd::RowsPerStrip, frame.interp.height as u32); + ifd.insert( + tags::ifd::FrameRate, + IfdValue::SRational((frame.interp.fps * 10000.0) as i32, 10000), + ); + ifd.insert(tags::ifd::BitsPerSample, frame.interp.bit_depth as u32); + ifd.insert( + tags::ifd::CFAPattern, + match (frame.interp.cfa.red_in_first_row, frame.interp.cfa.red_in_first_col) { + (true, true) => [0u8, 1, 1, 2], + (true, false) => [1, 0, 2, 1], + (false, true) => [1, 2, 0, 1], + (false, false) => [2, 1, 1, 0], + }, + ); - let ifd = frame_to_dng_ifd(frame, base_ifd)?; + ifd.insert( + tags::ifd::StripOffsets, + IfdValue::Offsets(Arc::new(frame.storage.clone())), + ); + ifd.insert(tags::ifd::StripByteCounts, frame.storage.len() as u32); let mut buffer = Cursor::new(Vec::new()); DngWriter::write_dng(&mut buffer, true, FileType::Dng, vec![ifd])?; diff --git a/src/nodes_io/reader_cinema_dng.rs b/src/nodes_io/reader_cinema_dng.rs index b2058f6..de02d6b 100644 --- a/src/nodes_io/reader_cinema_dng.rs +++ b/src/nodes_io/reader_cinema_dng.rs @@ -1,18 +1,11 @@ use crate::pipeline_processing::{ - frame::{ - CfaDescriptor, - ColorInterpretation, - Compression, - Frame, - FrameInterpretation, - SampleInterpretation, - }, + frame::{CfaDescriptor, Frame, Raw}, node::{Caps, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, processing_context::ProcessingContext, }; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use dng::{tags, DngReader}; use glob::glob; @@ -40,7 +33,7 @@ impl Parameterizable for CinemaDngReader { mut options: Parameters, _is_input_to: &[NodeID], context: &ProcessingContext, - ) -> Result + ) -> anyhow::Result where Self: Sized, { @@ -52,8 +45,8 @@ impl Parameterizable for CinemaDngReader { } Ok(Self { files, - cache_frames: options.has("cache-frames"), - internal_loop: options.has("internal-loop"), + cache_frames: options.take("cache-frames")?, + internal_loop: options.take("internal-loop")?, cache: Mutex::new((0..frame_count).map(|_| None).collect()), context: context.clone(), }) @@ -115,46 +108,20 @@ impl ProcessingNode for CinemaDngReader { .map(|x| x as u64) }; - let fps = dng - .get_entry_by_path(&main_ifd.chain_tag(tags::ifd::FrameRate)) - .map(|v| { - v.value - .as_f64() - .ok_or(anyhow!("couldnt interpret frame rate of DNG {path:?} as f64")) - }) - .transpose()?; - - - let bits_per_sample = get_tag_as_u32(tags::ifd::BitsPerSample)?; - let sample_interpretation = match get_tag_as_u32(tags::ifd::SampleFormat)? { - 1 => { - // uint - SampleInterpretation::UInt(bits_per_sample as u8) - } - 3 => { - // IEEE float - if bits_per_sample == 16 { - SampleInterpretation::FP16 - } else if bits_per_sample == 32 { - SampleInterpretation::FP32 - } else { - bail!("DNG is IEEE float with bits_per_sample={bits_per_sample}. This is unsupported") - } - } - other => bail!("Unknown SampleFormat {other}"), - }; - - - let interpretation = FrameInterpretation { + let interp = Raw { width: get_tag_as_u32(tags::ifd::ImageWidth)?, height: get_tag_as_u32(tags::ifd::ImageLength)?, - fps, - color_interpretation: ColorInterpretation::Bayer(cfa), - sample_interpretation, - compression: Compression::Uncompressed, + bit_depth: get_tag_as_u32(tags::ifd::BitsPerSample)?, + fps: dng + .get_entry_by_path(&main_ifd.chain_tag(tags::ifd::FrameRate)) + .ok_or(anyhow!("couldnt read frame rate of DNG {path:?}"))? + .value + .as_f64() + .ok_or(anyhow!("couldnt interpret frame rate of DNG {path:?} as f64"))?, + cfa, }; - let payload = Payload::from(Frame { storage: buffer, interpretation }); + let payload = Payload::from(Frame { storage: buffer, interp }); if self.cache_frames { self.cache.lock().unwrap()[frame_number as usize] = Some(payload.clone()); diff --git a/src/nodes_io/reader_raw.rs b/src/nodes_io/reader_raw.rs index ddc4055..28b3811 100644 --- a/src/nodes_io/reader_raw.rs +++ b/src/nodes_io/reader_raw.rs @@ -1,5 +1,5 @@ use crate::pipeline_processing::{ - frame::{Frame, FrameInterpretation}, + frame::{Frame, FrameInterpretation, FrameInterpretations}, node::{Caps, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, @@ -18,7 +18,7 @@ use std::{ pub struct RawBlobReader { file: Mutex, - interpretation: FrameInterpretation, + interp: FrameInterpretations, cache_frames: bool, cache: Mutex>>, frame_count: u64, @@ -38,20 +38,20 @@ impl Parameterizable for RawBlobReader { mut options: Parameters, _is_input_to: &[NodeID], context: &ProcessingContext, - ) -> Result + ) -> anyhow::Result where Self: Sized, { let path: String = options.take("file")?; let file = File::open(path)?; - let interpretation = options.get_interpretation()?; - let frame_count = file.metadata()?.len() / interpretation.required_bytes() as u64; + let interp = options.get_interpretation()?; + let frame_count = file.metadata()?.len() / interp.required_bytes() as u64; Ok(Self { file: Mutex::new(file), - interpretation, + interp, frame_count, - cache_frames: options.has("cache-frames"), + cache_frames: options.take("cache-frames")?, cache: Mutex::new((0..frame_count).map(|_| None).collect()), context: context.clone(), }) @@ -70,10 +70,10 @@ impl ProcessingNode for RawBlobReader { } let mut file = self.file.lock().unwrap(); - file.seek(SeekFrom::Start(frame_number * self.interpretation.required_bytes() as u64))?; + file.seek(SeekFrom::Start(frame_number * self.interp.required_bytes() as u64))?; let mut buffer = - unsafe { self.context.get_uninit_cpu_buffer(self.interpretation.required_bytes()) }; + unsafe { self.context.get_uninit_cpu_buffer(self.interp.required_bytes()) }; buffer .as_mut_slice(|buffer| file.read_exact(buffer).context("error while reading file"))?; @@ -83,8 +83,11 @@ impl ProcessingNode for RawBlobReader { } } - let payload = - Payload::from(Frame { storage: buffer, interpretation: self.interpretation.clone() }); + let payload = match self.interp { + FrameInterpretations::Raw(interp) => Payload::from(Frame { storage: buffer, interp }), + FrameInterpretations::Rgb(interp) => Payload::from(Frame { storage: buffer, interp }), + FrameInterpretations::Rgba(interp) => Payload::from(Frame { storage: buffer, interp }), + }; self.cache.lock().unwrap()[frame_number as usize] = Some(payload.clone()); Ok(payload) @@ -98,7 +101,7 @@ impl ProcessingNode for RawBlobReader { pub struct RawDirectoryReader { files: Vec, - interpretation: FrameInterpretation, + interp: FrameInterpretations, cache_frames: bool, internal_loop: bool, cache: Mutex>>, @@ -119,7 +122,7 @@ impl Parameterizable for RawDirectoryReader { mut options: Parameters, _is_input_to: &[NodeID], context: &ProcessingContext, - ) -> Result + ) -> anyhow::Result where Self: Sized, { @@ -131,9 +134,9 @@ impl Parameterizable for RawDirectoryReader { } Ok(Self { files, - interpretation: options.get_interpretation()?, - cache_frames: options.has("cache-frames"), - internal_loop: options.has("internal-loop"), + interp: options.get_interpretation()?, + cache_frames: options.take("cache-frames")?, + internal_loop: options.take("internal-loop")?, cache: Mutex::new((0..frame_count).map(|_| None).collect()), context: context.clone(), }) @@ -164,12 +167,16 @@ impl ProcessingNode for RawDirectoryReader { let path = &self.files[frame_number as usize]; let mut file = File::open(path)?; let mut buffer = - unsafe { self.context.get_uninit_cpu_buffer(self.interpretation.required_bytes()) }; + unsafe { self.context.get_uninit_cpu_buffer(self.interp.required_bytes()) }; buffer .as_mut_slice(|buffer| file.read_exact(buffer).context("error while reading file"))?; - let payload = - Payload::from(Frame { storage: buffer, interpretation: self.interpretation.clone() }); + + let payload = match self.interp { + FrameInterpretations::Raw(interp) => Payload::from(Frame { storage: buffer, interp }), + FrameInterpretations::Rgb(interp) => Payload::from(Frame { storage: buffer, interp }), + FrameInterpretations::Rgba(interp) => Payload::from(Frame { storage: buffer, interp }), + }; if self.cache_frames { self.cache.lock().unwrap()[frame_number as usize] = Some(payload.clone()); diff --git a/src/nodes_io/reader_tcp.rs b/src/nodes_io/reader_tcp.rs index 66b2966..d2c165b 100644 --- a/src/nodes_io/reader_tcp.rs +++ b/src/nodes_io/reader_tcp.rs @@ -1,6 +1,6 @@ use crate::{ pipeline_processing::{ - frame::{Frame, FrameInterpretation}, + frame::{Frame, FrameInterpretation, FrameInterpretations}, node::{Caps, EOFError, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, @@ -14,7 +14,7 @@ use std::{io::Read, net::TcpStream, sync::Mutex}; pub struct TcpReader { tcp_connection: Mutex, - interpretation: FrameInterpretation, + interp: FrameInterpretations, notifier: AsyncNotifier, context: ProcessingContext, } @@ -35,7 +35,7 @@ impl Parameterizable for TcpReader { { Ok(Self { tcp_connection: Mutex::new(TcpStream::connect(parameters.take::("address")?)?), - interpretation: parameters.get_interpretation()?, + interp: parameters.get_interpretation()?, notifier: Default::default(), context: context.clone(), }) @@ -50,14 +50,18 @@ impl ProcessingNode for TcpReader { self.notifier.wait(move |x| *x >= frame_number).await; let mut buffer = - unsafe { self.context.get_uninit_cpu_buffer(self.interpretation.required_bytes()) }; + unsafe { self.context.get_uninit_cpu_buffer(self.interp.required_bytes()) }; buffer .as_mut_slice(|slice| self.tcp_connection.lock().unwrap().read_exact(slice)) .context(EOFError)?; self.notifier.update(|x| *x = frame_number + 1); - let payload = Payload::from(Frame { storage: buffer, interpretation: self.interpretation }); + let payload = match self.interp { + FrameInterpretations::Raw(interp) => Payload::from(Frame { storage: buffer, interp }), + FrameInterpretations::Rgb(interp) => Payload::from(Frame { storage: buffer, interp }), + FrameInterpretations::Rgba(interp) => Payload::from(Frame { storage: buffer, interp }), + }; Ok(payload) } diff --git a/src/nodes_io/reader_webcam.rs b/src/nodes_io/reader_webcam.rs index 35869f4..076c2de 100644 --- a/src/nodes_io/reader_webcam.rs +++ b/src/nodes_io/reader_webcam.rs @@ -1,6 +1,6 @@ use crate::{ pipeline_processing::{ - frame::{Frame, FrameInterpretation, Rgb, SampleInterpretation}, + frame::{Frame, FrameInterpretation, Rgb}, node::{Caps, NodeID, ProcessingNode, Request}, parametrizable::prelude::*, payload::Payload, @@ -27,7 +27,7 @@ use v4l2_sys_mit::*; pub struct WebcamInput { queue: AsyncNotifier<(u64, u64)>, stream: RwLock, - interpretation: FrameInterpretation, + interp: Rgb, context: ProcessingContext, } @@ -36,7 +36,7 @@ impl Parameterizable for WebcamInput { Some("read frames from a webcam (or webcam like source like a frame-grabber)"); fn describe_parameters() -> ParametersDescriptor { - ParametersDescriptor::new().with("device", WithDefault(NaturalWithZero(), IntRangeValue(0))) + ParametersDescriptor::new().with("device", Optional(NaturalWithZero())) } fn from_parameters( mut options: Parameters, @@ -46,21 +46,14 @@ impl Parameterizable for WebcamInput { let dev = Device::new(options.take::("device")? as usize).expect("Failed to open device"); let format = dev.format()?; - let interpretation = FrameInterpretation { - width: format.width as u64, - height: format.height as u64, - fps: 10000.0, // TODO: this is a dirty hack - color_interpretation: ColorInterpretation::Rgb, - sample_interpretation: SampleInterpretation::UInt(8), - compression: Compression::Uncompressed, - }; + let interp = Rgb { width: format.width as u64, height: format.height as u64, fps: 10000.0 }; let mut stream = CpuBufferQueueManager::new(&dev); stream.start(); Ok(Self { queue: Default::default(), stream: RwLock::new(stream), - interpretation, + interp, context: context.clone(), }) } @@ -97,7 +90,7 @@ impl ProcessingNode for WebcamInput { // frame, metadata.sequence let mut buffer = - unsafe { self.context.get_uninit_cpu_buffer(self.interpretation.required_bytes()) }; + unsafe { self.context.get_uninit_cpu_buffer(self.interp.required_bytes()) }; buffer.as_mut_slice(|buffer| { for (src, dst) in frame.chunks_exact(3).zip(buffer.chunks_exact_mut(3)) { dst[0] = src[2]; @@ -106,7 +99,7 @@ impl ProcessingNode for WebcamInput { } }); - return Ok(Payload::from(Frame { storage: buffer, interpretation: self.interpretation })); + return Ok(Payload::from(Frame { storage: buffer, interp: self.interp })); } fn get_caps(&self) -> Caps { Caps { frame_count: None, random_access: false } } @@ -117,7 +110,6 @@ pub struct CpuBufferQueueManager { buffer_size: usize, buffers: Vec>>, } - impl CpuBufferQueueManager { fn new(dev: &Device) -> Self { let handle = dev.handle(); diff --git a/src/nodes_io/writer_cinema_dng.rs b/src/nodes_io/writer_cinema_dng.rs index 3abad46..d17ef10 100644 --- a/src/nodes_io/writer_cinema_dng.rs +++ b/src/nodes_io/writer_cinema_dng.rs @@ -1,12 +1,12 @@ use crate::pipeline_processing::{ buffers::CpuBuffer, - frame::{ColorInterpretation, Frame, SampleInterpretation}, + frame::Raw, node::{InputProcessingNode, NodeID, ProgressUpdate, SinkNode}, parametrizable::prelude::*, processing_context::ProcessingContext, puller::pull_unordered, }; -use anyhow::{bail, Context, Result}; +use anyhow::{Context, Result}; use async_trait::async_trait; use dng::{ ifd::{Ifd, IfdValue, Offsets}, @@ -30,7 +30,7 @@ use std::{ pub struct CinemaDngWriter { dir_path: String, input: InputProcessingNode, - number_of_frames: Option, + number_of_frames: u64, priority: u8, base_ifd: Ifd, } @@ -42,8 +42,8 @@ impl Parameterizable for CinemaDngWriter { ParametersDescriptor::new() .with("input", Mandatory(NodeInputParameter)) .with("path", Mandatory(StringParameter)) - .with("priority", WithDefault(U8(), IntRangeValue(0))) - .with("number-of-frames", Optional(NaturalGreaterZero())) + .with("priority", Optional(U8())) + .with("number-of-frames", Optional(NaturalWithZero())) .with("dcp-yaml", Optional(StringParameter)) } @@ -58,12 +58,13 @@ impl Parameterizable for CinemaDngWriter { let mut base_ifd = IfdYamlParser::default().parse_from_str(include_str!("./base_ifd.yml"))?; - let dcp_ifd = if let Some(path) = parameters.take_option::("dcp-yaml")? { - let path = PathBuf::from_str(&path)?; + let path_string = parameters.take::("dcp-yaml").unwrap_or("".to_string()); + let dcp_ifd = if path_string.is_empty() { + IfdYamlParser::default().parse_from_str(include_str!("./default_dcp.yml"))? + } else { + let path = PathBuf::from_str(&path_string)?; let data = fs::read_to_string(path.clone()).context("couldnt read dcp-yaml file")?; IfdYamlParser::new(path).parse_from_str(&data).context("couldnt parse dcp-yaml file")? - } else { - IfdYamlParser::default().parse_from_str(include_str!("./default_dcp.yml"))? }; base_ifd.insert_from_other(dcp_ifd); @@ -74,7 +75,7 @@ impl Parameterizable for CinemaDngWriter { Ok(Self { dir_path: filename, input: parameters.take("input")?, - number_of_frames: parameters.take_option("number-of-frames")?, + number_of_frames: parameters.take("number-of-frames")?, priority: parameters.take("priority")?, base_ifd, }) @@ -100,10 +101,36 @@ impl SinkNode for CinemaDngWriter { self.number_of_frames, move |input, frame_number| { let frame = context - .ensure_cpu_buffer_frame(&input) + .ensure_cpu_buffer::(&input) .context("Wrong input format for CinemaDngWriter")?; - let ifd = frame_to_dng_ifd(frame, base_ifd.clone())?; + + let mut ifd = Ifd::new(IfdType::Ifd); + ifd.insert_from_other(base_ifd.clone()); + + ifd.insert(tags::ifd::ImageWidth, frame.interp.width as u32); + ifd.insert(tags::ifd::ImageLength, frame.interp.height as u32); + ifd.insert(tags::ifd::RowsPerStrip, frame.interp.height as u32); + ifd.insert( + tags::ifd::FrameRate, + IfdValue::SRational((frame.interp.fps * 10000.0) as i32, 10000), + ); + ifd.insert(tags::ifd::BitsPerSample, frame.interp.bit_depth as u32); + ifd.insert( + tags::ifd::CFAPattern, + match (frame.interp.cfa.red_in_first_row, frame.interp.cfa.red_in_first_col) { + (true, true) => [0u8, 1, 1, 2], + (true, false) => [1, 0, 2, 1], + (false, true) => [1, 2, 0, 1], + (false, false) => [2, 1, 1, 0], + }, + ); + + ifd.insert( + tags::ifd::StripOffsets, + IfdValue::Offsets(Arc::new(frame.storage.clone())), + ); + ifd.insert(tags::ifd::StripByteCounts, frame.storage.len() as u32); let file = File::create(format!("{}/{:06}.dng", &dir_path, frame_number))?; DngWriter::write_dng(file, true, FileType::Dng, vec![ifd])?; @@ -121,50 +148,3 @@ impl Offsets for CpuBuffer { self.as_slice(|slice| writer.write_all(slice)) } } - - -pub fn frame_to_dng_ifd(frame: Arc>, base_ifd: Ifd) -> Result { - let mut ifd = Ifd::new(IfdType::Ifd); - ifd.insert_from_other(base_ifd); - - if let ColorInterpretation::Bayer(cfa) = frame.interpretation.color_interpretation { - ifd.insert( - tags::ifd::CFAPattern, - match (cfa.red_in_first_row, cfa.red_in_first_col) { - (true, true) => [0u8, 1, 1, 2], - (true, false) => [1, 0, 2, 1], - (false, true) => [1, 2, 0, 1], - (false, false) => [2, 1, 1, 0], - }, - ); - } else { - bail!("cant write non-bayer image as DNG") - } - - match frame.interpretation.sample_interpretation { - SampleInterpretation::UInt(bits) => { - ifd.insert(tags::ifd::BitsPerSample, bits as u32); - ifd.insert(tags::ifd::SampleFormat, 1); - } - SampleInterpretation::FP16 => { - ifd.insert(tags::ifd::BitsPerSample, 16); - ifd.insert(tags::ifd::SampleFormat, 3); - } - SampleInterpretation::FP32 => { - ifd.insert(tags::ifd::BitsPerSample, 32); - ifd.insert(tags::ifd::SampleFormat, 3); - } - } - - ifd.insert(tags::ifd::ImageWidth, frame.interpretation.width as u32); - ifd.insert(tags::ifd::ImageLength, frame.interpretation.height as u32); - ifd.insert(tags::ifd::RowsPerStrip, frame.interpretation.height as u32); - if let Some(fps) = frame.interpretation.fps { - ifd.insert(tags::ifd::FrameRate, IfdValue::SRational((fps * 10000.0) as i32, 10000)); - } - - ifd.insert(tags::ifd::StripOffsets, IfdValue::Offsets(Arc::new(frame.storage.clone()))); - ifd.insert(tags::ifd::StripByteCounts, frame.storage.len() as u32); - - Ok(ifd) -} diff --git a/src/nodes_io/writer_ffmpeg.rs b/src/nodes_io/writer_ffmpeg.rs index 1f4d373..b936bbd 100644 --- a/src/nodes_io/writer_ffmpeg.rs +++ b/src/nodes_io/writer_ffmpeg.rs @@ -1,11 +1,11 @@ use crate::pipeline_processing::{ - frame::{ColorInterpretation, SampleInterpretation}, + frame::Rgb, node::{InputProcessingNode, NodeID, ProgressUpdate, SinkNode}, parametrizable::prelude::*, processing_context::ProcessingContext, puller::pull_ordered, }; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{Context, Result}; use async_trait::async_trait; use std::{ io::Write, @@ -24,8 +24,8 @@ impl Parameterizable for FfmpegWriter { ParametersDescriptor::new() .with("input", Mandatory(NodeInputParameter)) .with("output", Mandatory(StringParameter)) - .with("priority", WithDefault(U8(), ParameterValue::IntRangeValue(0))) - .with("input-options", WithDefault(StringParameter, StringValue("".to_string()))) + .with("priority", Optional(U8())) + .with("input-options", Optional(StringParameter)) } fn from_parameters( mut parameters: Parameters, @@ -56,27 +56,18 @@ impl SinkNode for FfmpegWriter { self.priority, progress_callback, self.input.clone_for_same_puller(), - None, + 0, ); let mut frame = context - .ensure_cpu_buffer_frame(&rx.recv_async().await.unwrap()) + .ensure_cpu_buffer::(&rx.recv_async().await.unwrap()) .context("Wrong input format for FfmpegWriter")?; let input_options = &self.input_options; - let fps = frame.interpretation.fps.ok_or(anyhow!("need to know fps to write video"))?; - let width = frame.interpretation.width; - let height = frame.interpretation.height; + let fps = frame.interp.fps; + let width = frame.interp.width; + let height = frame.interp.height; let output = &self.output; - if !matches!(frame.interpretation.sample_interpretation, SampleInterpretation::UInt(8)) { - bail!("A frame with bit_depth=8 is required. Convert the bit depth of the frame!") - } - let pixel_format = match frame.interpretation.color_interpretation { - ColorInterpretation::Bayer(_) => bail!("cant write bayer video with ffmpeg!"), - ColorInterpretation::Rgb => "rgb24", - ColorInterpretation::Rgba => "rgba", - }; - - let args_string = format!("{input_options} -f rawvideo -framerate {fps} -video_size {width}x{height} -pixel_format {pixel_format} -i - {output}"); + let args_string = format!("{input_options} -f rawvideo -framerate {fps} -video_size {width}x{height} -pixel_format rgb24 -i - {output}"); let mut child = Command::new("ffmpeg") .args(shlex::split(&args_string).unwrap()) @@ -88,7 +79,7 @@ impl SinkNode for FfmpegWriter { if let Ok(payload) = rx.recv_async().await { frame = context - .ensure_cpu_buffer_frame(&payload) + .ensure_cpu_buffer::(&payload) .context("Wrong input format for FfmpegWriter")?; } else { break; diff --git a/src/nodes_io/writer_raw.rs b/src/nodes_io/writer_raw.rs index 705bd1c..632320b 100644 --- a/src/nodes_io/writer_raw.rs +++ b/src/nodes_io/writer_raw.rs @@ -16,7 +16,7 @@ use std::{ pub struct RawBlobWriter { file: Arc>, input: InputProcessingNode, - number_of_frames: Option, + number_of_frames: u64, priority: u8, } impl Parameterizable for RawBlobWriter { @@ -24,7 +24,7 @@ impl Parameterizable for RawBlobWriter { ParametersDescriptor::new() .with("path", Mandatory(StringParameter)) .with("input", Mandatory(NodeInputParameter)) - .with("priority", WithDefault(U8(), ParameterValue::IntRangeValue(0))) + .with("priority", Optional(U8())) .with("number-of-frames", Optional(NaturalWithZero())) } fn from_parameters( @@ -38,7 +38,7 @@ impl Parameterizable for RawBlobWriter { Ok(Self { file: Arc::new(Mutex::new(File::create(parameters.take::("path")?)?)), input: parameters.take("input")?, - number_of_frames: parameters.take_option("number-of-frames")?, + number_of_frames: parameters.take("number-of-frames")?, priority: parameters.take("priority")?, }) } @@ -59,8 +59,8 @@ impl SinkNode for RawBlobWriter { self.number_of_frames, ); while let Ok(payload) = rx.recv_async().await { - let frame = context.ensure_cpu_buffer_frame(&payload)?; - frame.storage.as_slice(|slice| self.file.lock().unwrap().write_all(slice))?; + let buffer = context.ensure_any_cpu_buffer(&payload)?; + buffer.as_slice(|slice| self.file.lock().unwrap().write_all(slice))?; } Ok(()) @@ -70,7 +70,7 @@ impl SinkNode for RawBlobWriter { pub struct RawDirectoryWriter { dir_path: String, input: InputProcessingNode, - number_of_frames: Option, + number_of_frames: u64, priority: u8, } impl Parameterizable for RawDirectoryWriter { @@ -78,7 +78,7 @@ impl Parameterizable for RawDirectoryWriter { ParametersDescriptor::new() .with("path", Mandatory(StringParameter)) .with("input", Mandatory(NodeInputParameter)) - .with("priority", WithDefault(U8(), ParameterValue::IntRangeValue(0))) + .with("priority", Optional(U8())) .with("number-of-frames", Optional(NaturalWithZero())) } @@ -95,7 +95,7 @@ impl Parameterizable for RawDirectoryWriter { Ok(Self { dir_path: filename, input: parameters.take("input")?, - number_of_frames: parameters.take_option("number-of-frames")?, + number_of_frames: parameters.take("number-of-frames")?, priority: parameters.take("priority")?, }) } @@ -116,9 +116,9 @@ impl SinkNode for RawDirectoryWriter { self.input.clone_for_same_puller(), self.number_of_frames, move |payload, frame_number| { - let frame = context_clone.ensure_cpu_buffer_frame(&payload)?; + let buffer = context_clone.ensure_any_cpu_buffer(&payload)?; let mut file = File::create(format!("{}/{:06}.data", &dir_path, frame_number))?; - frame.storage.as_slice(|slice| file.write_all(slice))?; + buffer.as_slice(|slice| file.write_all(slice))?; Ok(()) }, ) diff --git a/src/nodes_util/cache.rs b/src/nodes_util/cache.rs index 596a7b2..cbbde5d 100644 --- a/src/nodes_util/cache.rs +++ b/src/nodes_util/cache.rs @@ -23,7 +23,7 @@ impl Parameterizable for Cache { fn describe_parameters() -> ParametersDescriptor { ParametersDescriptor::new() .with("input", Mandatory(NodeInputParameter)) - .with("size", WithDefault(NaturalGreaterZero(), IntRangeValue(1))) + .with("size", Optional(NaturalGreaterZero())) } fn from_parameters( diff --git a/src/nodes_util/mod.rs b/src/nodes_util/mod.rs index 020b574..4b03cf6 100644 --- a/src/nodes_util/mod.rs +++ b/src/nodes_util/mod.rs @@ -1,3 +1,2 @@ pub mod cache; -pub mod null_source; pub mod split; diff --git a/src/nodes_util/null_source.rs b/src/nodes_util/null_source.rs deleted file mode 100644 index 18b5502..0000000 --- a/src/nodes_util/null_source.rs +++ /dev/null @@ -1,52 +0,0 @@ -use crate::pipeline_processing::{ - frame::{Frame, FrameInterpretation}, - node::{Caps, NodeID, ProcessingNode, Request}, - parametrizable::{Parameterizable, Parameters, ParametersDescriptor}, - payload::Payload, - processing_context::ProcessingContext, -}; -use async_trait::async_trait; - -#[derive(Clone)] -pub struct NullFrameSource { - pub context: ProcessingContext, - pub interpretation: FrameInterpretation, -} - -impl Parameterizable for NullFrameSource { - const DESCRIPTION: Option<&'static str> = - Some("returns frames with the specified interpretation where all bytes are zero"); - - fn describe_parameters() -> ParametersDescriptor { - ParametersDescriptor::new().with_interpretation() - } - fn from_parameters( - mut parameters: Parameters, - _is_input_to: &[NodeID], - context: &ProcessingContext, - ) -> anyhow::Result - where - Self: Sized, - { - Ok(Self { context: context.clone(), interpretation: parameters.get_interpretation()? }) - } -} - -#[async_trait] -impl ProcessingNode for NullFrameSource { - async fn pull(&self, _request: Request) -> anyhow::Result { - let buffer = unsafe { - let mut buffer = - self.context.get_uninit_cpu_buffer(self.interpretation.required_bytes()); - buffer.as_mut_slice(|buffer| { - buffer.as_mut_ptr().write_bytes(0, buffer.len()); - }); - buffer - }; - - let payload = - Payload::from(Frame { storage: buffer, interpretation: self.interpretation.clone() }); - Ok(payload) - } - fn get_caps(&self) -> Caps { Caps { frame_count: None, random_access: true } } -} diff --git a/src/pipeline_processing/frame.rs b/src/pipeline_processing/frame.rs index cb2ec7e..9b69d1c 100644 --- a/src/pipeline_processing/frame.rs +++ b/src/pipeline_processing/frame.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + pub trait ToAny: 'static { fn as_any(&self) -> &dyn std::any::Any; } @@ -5,84 +7,133 @@ impl ToAny for T { fn as_any(&self) -> &dyn std::any::Any { self } } + +pub trait FrameInterpretation: ToAny { + fn required_bytes(&self) -> usize; + fn width(&self) -> u64; + fn height(&self) -> u64; + fn fps(&self) -> Option; +} + /// The main data structure for transferring and representing single raw frames /// of a video stream -pub struct Frame { - pub interpretation: FrameInterpretation, +pub struct Frame { + pub interp: Interpretation, pub storage: Storage, } -#[derive(Copy, Clone, Debug, PartialEq)] -pub struct FrameInterpretation { +#[derive(Debug, Copy, Clone)] +pub struct CfaDescriptor { + pub red_in_first_col: bool, + pub red_in_first_row: bool, +} + +impl CfaDescriptor { + pub fn from_first_red(red_in_first_col: bool, red_in_first_row: bool) -> Self { + CfaDescriptor { red_in_first_col, red_in_first_row } + } +} + +#[derive(Clone, Copy, Debug)] +pub struct Raw { pub width: u64, pub height: u64, - pub fps: Option, + pub bit_depth: u64, + pub cfa: CfaDescriptor, + pub fps: f64, +} - pub color_interpretation: ColorInterpretation, - pub sample_interpretation: SampleInterpretation, - pub compression: Compression, -} -impl FrameInterpretation { - pub fn required_bytes(&self) -> usize { - match self.compression { - Compression::Uncompressed => self.required_bytes_uncompressed(), - Compression::SZ3Compressed { size } => size, - } - } - fn required_bytes_uncompressed(&self) -> usize { - (self.width - * self.height - * self.color_interpretation.samples_per_pixel() - * self.sample_interpretation.bits_per_sample()) as usize - / 8 +impl FrameInterpretation for Raw { + fn required_bytes(&self) -> usize { + self.width as usize * self.height as usize * self.bit_depth as usize / 8 } + fn width(&self) -> u64 { self.width } + fn height(&self) -> u64 { self.height } + fn fps(&self) -> Option { Some(self.fps) } } -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] -pub enum ColorInterpretation { - Bayer(CfaDescriptor), - Rgb, - Rgba, -} -impl ColorInterpretation { - pub fn samples_per_pixel(&self) -> u64 { - match self { - ColorInterpretation::Bayer(_) => 1, - ColorInterpretation::Rgb => 3, - ColorInterpretation::Rgba => 4, - } - } +#[derive(Clone, Copy, Debug)] +pub struct Rgb { + pub width: u64, + pub height: u64, + pub fps: f64, } -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub struct CfaDescriptor { - pub red_in_first_col: bool, - pub red_in_first_row: bool, +impl FrameInterpretation for Rgb { + fn required_bytes(&self) -> usize { self.width as usize * self.height as usize * 3 } + fn width(&self) -> u64 { self.width } + fn height(&self) -> u64 { self.height } + fn fps(&self) -> Option { Some(self.fps) } } -impl CfaDescriptor { - pub fn from_first_red(red_in_first_col: bool, red_in_first_row: bool) -> Self { - CfaDescriptor { red_in_first_col, red_in_first_row } + +#[derive(Clone, Copy, Debug)] +pub struct Rgba { + pub width: u64, + pub height: u64, + pub fps: f64, +} + +impl FrameInterpretation for Rgba { + fn required_bytes(&self) -> usize { self.width as usize * self.height as usize * 4 } + fn width(&self) -> u64 { self.width } + fn height(&self) -> u64 { self.height } + fn fps(&self) -> Option { Some(self.fps) } +} + +#[derive(Clone)] +pub struct SZ3Compressed { + inner: Arc, + compressed_size: usize, +} +impl SZ3Compressed { + pub fn new(inner: Arc, compressed_size: usize) -> Self { + Self { inner, compressed_size } + } + pub fn downcast_inner(&self) -> Option<&T> { + let v: &dyn std::any::Any = self.inner.as_any(); + v.downcast_ref() } } +impl FrameInterpretation for SZ3Compressed { + fn required_bytes(&self) -> usize { self.compressed_size } + fn width(&self) -> u64 { self.inner.width() } + fn height(&self) -> u64 { self.inner.height() } + fn fps(&self) -> Option { self.inner.fps() } +} -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub enum SampleInterpretation { - UInt(u8), - FP16, - FP32, -} -impl SampleInterpretation { - pub fn bits_per_sample(&self) -> u64 { +#[derive(Clone, Copy, Debug)] +pub enum FrameInterpretations { + Raw(Raw), + Rgb(Rgb), + Rgba(Rgba), +} +impl FrameInterpretation for FrameInterpretations { + fn required_bytes(&self) -> usize { match self { - SampleInterpretation::UInt(bits) => *bits as _, - SampleInterpretation::FP16 => 16, - SampleInterpretation::FP32 => 32, + FrameInterpretations::Raw(interp) => interp.required_bytes(), + FrameInterpretations::Rgb(interp) => interp.required_bytes(), + FrameInterpretations::Rgba(interp) => interp.required_bytes(), + } + } + fn width(&self) -> u64 { + match self { + FrameInterpretations::Raw(interp) => interp.width(), + FrameInterpretations::Rgb(interp) => interp.width(), + FrameInterpretations::Rgba(interp) => interp.width(), + } + } + fn height(&self) -> u64 { + match self { + FrameInterpretations::Raw(interp) => interp.height(), + FrameInterpretations::Rgb(interp) => interp.height(), + FrameInterpretations::Rgba(interp) => interp.height(), + } + } + fn fps(&self) -> Option { + match self { + FrameInterpretations::Raw(interp) => interp.fps(), + FrameInterpretations::Rgb(interp) => interp.fps(), + FrameInterpretations::Rgba(interp) => interp.fps(), } } -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub enum Compression { - Uncompressed, - SZ3Compressed { size: usize }, } diff --git a/src/pipeline_processing/gpu_util.rs b/src/pipeline_processing/gpu_util.rs index c488567..fc6ee0a 100644 --- a/src/pipeline_processing/gpu_util.rs +++ b/src/pipeline_processing/gpu_util.rs @@ -19,10 +19,10 @@ use vulkano::{ sync::GpuFuture, }; -pub fn to_immutable_buffer( - frame: Arc>, +pub fn to_immutable_buffer( + frame: Arc>, queue: Arc, -) -> (Frame, impl GpuFuture) { +) -> (Frame, impl GpuFuture) { let device = queue.device(); let (buffer, fut) = unsafe { @@ -57,22 +57,22 @@ pub fn to_immutable_buffer( (buffer, future) }; - (Frame { interpretation: frame.interpretation.clone(), storage: buffer.into() }, fut) + (Frame { interp: frame.interp.clone(), storage: buffer.into() }, fut) } -pub fn ensure_gpu_buffer_frame( +pub fn ensure_gpu_buffer( payload: &Payload, queue: Arc, -) -> anyhow::Result<(Arc>, impl GpuFuture)> { - if let Ok(frame) = payload.downcast::>() { +) -> anyhow::Result<(Arc>, impl GpuFuture)> { + if let Ok(frame) = payload.downcast::>() { let (buf, fut) = to_immutable_buffer(frame, queue); Ok((Arc::new(buf), fut.boxed())) - } else if let Ok(frame) = payload.downcast::>() { + } else if let Ok(frame) = payload.downcast::>() { Ok((frame, vulkano::sync::now(queue.device().clone()).boxed())) } else { Err(anyhow!( - "wanted a frame with type {}, but the payload was of type {}", - std::any::type_name::>(), + "wanted a frame with interpretation {}, but the payload was of type {}", + std::any::type_name::(), payload.type_name )) } diff --git a/src/pipeline_processing/node.rs b/src/pipeline_processing/node.rs index 4fc2115..72e93ee 100644 --- a/src/pipeline_processing/node.rs +++ b/src/pipeline_processing/node.rs @@ -79,7 +79,7 @@ pub trait ProcessingNode { fn get_caps(&self) -> Caps; } -#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Default)] +#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] pub struct NodeID(u16); impl From for usize { diff --git a/src/pipeline_processing/parametrizable.rs b/src/pipeline_processing/parametrizable.rs index 5543124..c5602cc 100644 --- a/src/pipeline_processing/parametrizable.rs +++ b/src/pipeline_processing/parametrizable.rs @@ -1,15 +1,9 @@ use crate::pipeline_processing::{ - frame::{ - CfaDescriptor, - ColorInterpretation, - Compression, - FrameInterpretation, - SampleInterpretation, - }, + frame::{CfaDescriptor, FrameInterpretations, Raw, Rgb}, node::{InputProcessingNode, Node, NodeID}, processing_context::ProcessingContext, }; -use anyhow::{anyhow, bail, Context, Error, Result}; +use anyhow::{anyhow, Context, Error, Result}; use prelude::*; use std::{ any::type_name, @@ -29,14 +23,14 @@ pub enum ParameterValue { impl ParameterValue { fn clone_for_same_puller(&self) -> Self { match self { - FloatRangeValue(f) => FloatRangeValue(*f), - IntRangeValue(i) => IntRangeValue(*i), - BoolValue(b) => BoolValue(*b), - StringValue(s) => StringValue(s.clone()), + FloatRangeValue(f) => Self::FloatRangeValue(*f), + IntRangeValue(i) => Self::IntRangeValue(*i), + BoolValue(b) => Self::BoolValue(*b), + StringValue(s) => Self::StringValue(s.clone()), ListValue(l) => { - ListValue(l.iter().map(ParameterValue::clone_for_same_puller).collect()) + Self::ListValue(l.iter().map(ParameterValue::clone_for_same_puller).collect()) } - NodeInputValue(n) => NodeInputValue(n.clone_for_same_puller()), + Self::NodeInputValue(n) => Self::NodeInputValue(n.clone_for_same_puller()), } } } @@ -66,19 +60,7 @@ impl TryInto for ParameterValue { fn try_into(self) -> Result { match self { FloatRangeValue(v) => Ok(v), - _ => Err(anyhow!("cant convert a non FloatRange ParameterValue ({self:?}) to f64")), - } - } -} - - -impl TryInto for ParameterValue { - type Error = Error; - - fn try_into(self) -> Result { - match self { - FloatRangeValue(v) => Ok(v as f32), - _ => Err(anyhow!("cant convert a non FloatRange ParameterValue ({self:?}) to f32")), + _ => Err(anyhow!("cant convert a non FloatRange ParameterValue to f64")), } } } @@ -89,7 +71,7 @@ impl TryInto for ParameterValue { fn try_into(self) -> Result { match self { IntRangeValue(v) => Ok(v), - _ => Err(anyhow!("cant convert a non IntRange ParameterValue ({self:?}) to i64")), + _ => Err(anyhow!("cant convert a non IntRange ParameterValue to i64")), } } } @@ -100,7 +82,7 @@ impl TryInto for ParameterValue { fn try_into(self) -> Result { match self { IntRangeValue(v) => Ok(v as u64), - _ => Err(anyhow!("cant convert a non IntRange ParameterValue ({self:?}) to u64")), + _ => Err(anyhow!("cant convert a non IntRange ParameterValue to u64")), } } } @@ -112,7 +94,7 @@ impl TryInto for ParameterValue { fn try_into(self) -> Result { match self { IntRangeValue(v) => Ok(v as usize), - _ => Err(anyhow!("cant convert a non IntRange ParameterValue ({self:?}) to u64")), + _ => Err(anyhow!("cant convert a non IntRange ParameterValue to u64")), } } } @@ -123,7 +105,7 @@ impl TryInto for ParameterValue { fn try_into(self) -> Result { match self { IntRangeValue(v) => Ok(v as u8), - _ => Err(anyhow!("cant convert a non IntRange ParameterValue ({self:?}) to u8")), + _ => Err(anyhow!("cant convert a non IntRange ParameterValue to u8")), } } } @@ -134,9 +116,7 @@ impl TryInto for ParameterValue { fn try_into(self) -> Result { match self { StringValue(v) => Ok(v), - _ => Err(anyhow!( - "cant convert a non StringParameter ParameterValue ({self:?}) to string" - )), + _ => Err(anyhow!("cant convert a non StringParameter ParameterValue to string")), } } } @@ -147,7 +127,7 @@ impl TryInto for ParameterValue { fn try_into(self) -> Result { match self { BoolValue(v) => Ok(v), - _ => Err(anyhow!("cant convert a non BoolParameter ParameterValue ({self:?}) to bool")), + _ => Err(anyhow!("cant convert a non BoolParameter ParameterValue to bool")), } } } @@ -158,9 +138,7 @@ impl TryInto for ParameterValue { fn try_into(self) -> Result { match self { NodeInputValue(v) => Ok(v), - _ => Err(anyhow!( - "cant convert a non NodeInput ParameterValue ({self:?}) to ProcessingNode" - )), + _ => Err(anyhow!("cant convert a non NodeInput ParameterValue to ProcessingNode")), } } } @@ -176,7 +154,7 @@ impl Parameters { pub fn take(&mut self, key: &str) -> Result where - ParameterValue: TryInto, + ParameterValue: TryInto, { let parameter_value = self .values @@ -188,7 +166,7 @@ impl Parameters { // FIXME(robin): workaround to https://github.com/rust-lang/rust/issues/96634 pub fn take_vec(&mut self, key: &str) -> Result> where - ParameterValue: TryInto, + ParameterValue: TryInto, { let parameter_value = self .values @@ -202,16 +180,6 @@ impl Parameters { } } - pub fn take_option(&mut self, key: &str) -> Result> - where - ParameterValue: TryInto, - { - let parameter_value = self.values.remove(key); - parameter_value.map(|v| v.try_into()).transpose() - } - - pub fn has(&self, key: &str) -> bool { self.values.contains_key(key) } - pub(crate) fn add_inputs( mut self, puller_id: NodeID, @@ -242,61 +210,21 @@ impl Parameters { self } - pub fn get_interpretation(&mut self) -> Result { + pub fn get_interpretation(&mut self) -> Result { let width = self.take("width")?; let height = self.take("height")?; - let fps = self.take_option("fps")?; - - let sample_interpretation = { - if let Some(bits) = self.take_option::("uint-bits")? { - SampleInterpretation::UInt(bits) - } else if self.has("fp16") { - SampleInterpretation::FP16 - } else if self.has("fp32") { - SampleInterpretation::FP32 - } else { - bail!("no sample interpretation was specified") - } - }; - - let color_interpretation = { - if let Some(pattern) = self.take_option::("bayer")? { - match pattern.to_uppercase().as_str() { - "RGBG" => ColorInterpretation::Bayer(CfaDescriptor { - red_in_first_col: true, - red_in_first_row: true, - }), - "BGRG" => ColorInterpretation::Bayer(CfaDescriptor { - red_in_first_col: true, - red_in_first_row: false, - }), - "GBGR" => ColorInterpretation::Bayer(CfaDescriptor { - red_in_first_col: false, - red_in_first_row: true, - }), - "GRGB" => ColorInterpretation::Bayer(CfaDescriptor { - red_in_first_col: false, - red_in_first_row: true, - }), - _ => bail!("couldn't parse CFA Pattern"), - } - } else if self.take("rgb")? { - ColorInterpretation::Rgb - } else if self.take("rgba")? { - ColorInterpretation::Rgba - } else { - bail!("no color interpretation was specified") - } - }; - - Ok(FrameInterpretation { - width, - height, - fps, - color_interpretation, - sample_interpretation, - compression: Compression::Uncompressed, - }) + let bit_depth = self.take("bit-depth")?; + let cfa = CfaDescriptor::from_first_red( + self.take("red-in-first-col")?, + self.take("red-in-first-row")?, + ); + let fps = self.take("fps")?; + + if self.take("rgb")? { + Ok(FrameInterpretations::Rgb(Rgb { width, height, fps })) + } else { + Ok(FrameInterpretations::Raw(Raw { bit_depth, width, height, cfa, fps })) + } } } @@ -313,16 +241,16 @@ pub enum ParameterType { impl ParameterType { pub fn value_is_of_type(&self, value: ParameterValue) -> Result { match (self, &value) { - (StringParameter, StringValue(_)) => Ok(value), - (BoolParameter, BoolValue(_)) => Ok(value), - (FloatRange(min, max), FloatRangeValue(v)) => { + (StringParameter, ParameterValue::StringValue(_)) => Ok(value), + (BoolParameter, ParameterValue::BoolValue(_)) => Ok(value), + (FloatRange(min, max), ParameterValue::FloatRangeValue(v)) => { if (v >= min) && (v <= max) { Ok(value) } else { Err(anyhow!("value {} is not {} <= value <= {}", v, min, max)) } } - (IntRange(min, max), IntRangeValue(v)) => { + (IntRange(min, max), ParameterValue::IntRangeValue(v)) => { if (v >= min) && (v <= max) { Ok(value) } else { @@ -334,10 +262,12 @@ impl ParameterType { } pub fn parse(&self, string: &str) -> Result { match self { - StringParameter => Ok(StringValue(string.to_string())), - BoolParameter => Ok(BoolValue(string.parse()?)), - IntRange(..) => self.value_is_of_type(IntRangeValue(string.parse()?)), - FloatRange(..) => self.value_is_of_type(FloatRangeValue(string.parse()?)), + StringParameter => Ok(ParameterValue::StringValue(string.to_string())), + BoolParameter => Ok(ParameterValue::BoolValue(string.parse()?)), + IntRange(..) => self.value_is_of_type(ParameterValue::IntRangeValue(string.parse()?)), + FloatRange(..) => { + self.value_is_of_type(ParameterValue::FloatRangeValue(string.parse()?)) + } NodeInputParameter => Err(anyhow!("cant parse node input from string")), ListParameter(ty) => { let values = if string.trim().is_empty() { @@ -345,16 +275,25 @@ impl ParameterType { } else { string.split(',').map(|part| ty.parse(part)).collect::>()? }; - Ok(ListValue(values)) + Ok(ParameterValue::ListValue(values)) } } } + pub fn default_value(&self) -> ParameterValue { + match &self { + FloatRange(min, _) => FloatRangeValue(*min), + IntRange(min, _) => IntRangeValue(*min), + ListParameter(_) => ListValue(vec![]), + StringParameter => StringValue("".to_string()), + BoolParameter => BoolValue(false), + NodeInputParameter => panic!("no default value for node input"), + } + } } #[derive(Debug)] pub enum ParameterTypeDescriptor { Mandatory(ParameterType), - Optional(ParameterType), WithDefault(ParameterType, ParameterValue), } @@ -363,22 +302,26 @@ impl Clone for ParameterTypeDescriptor { match self { Mandatory(ty) => Mandatory(ty.clone()), WithDefault(ty, v) => WithDefault(ty.clone(), v.clone_for_same_puller()), - Optional(ty) => Optional(ty.clone()), } } } impl ParameterTypeDescriptor { - pub fn get_parameter_type(&self) -> &ParameterType { + pub fn parse(&self, string: Option<&str>) -> Result { match self { - Mandatory(pt) => pt, - Optional(pt) => pt, - WithDefault(pt, _) => pt, + Mandatory(parameter_type) => { + string.map(|s| parameter_type.parse(s)).unwrap_or_else(|| { + Err(anyhow!("parameter was not supplied but is mandatory (no default value)")) + }) + } + WithDefault(parameter_type, default_value) => string + .map(|s| parameter_type.parse(s)) + .unwrap_or_else(|| Ok(default_value.clone_for_same_puller())), } } } -#[derive(Clone, Debug)] +#[derive(Debug, Clone)] pub struct ParametersDescriptor(pub HashMap); impl Default for ParametersDescriptor { @@ -392,26 +335,17 @@ impl ParametersDescriptor { ParametersDescriptor(self.0) } pub fn with_interpretation(self) -> ParametersDescriptor { - self - // general metadata + self.with("bit-depth", WithDefault(IntRange(8, 16), IntRangeValue(12))) .with("width", Mandatory(NaturalWithZero())) .with("height", Mandatory(NaturalWithZero())) + .with("red-in-first-col", WithDefault(BoolParameter, BoolValue(true))) + .with("red-in-first-row", WithDefault(BoolParameter, BoolValue(true))) + .with("rgb", Optional(BoolParameter)) .with("fps", WithDefault(PositiveReal(), FloatRangeValue(24.0))) - - // buffer interpretation - .with("uint-bits", Optional(IntRange(8, 16))) - .with("fp16", Flag()) - .with("fp32", Flag()) - - - // color interpretation - .with("bayer", WithDefault(StringParameter, StringValue("RGBG".to_string()))) - .with("rgb", Flag()) - .with("rgba", Flag()) } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct ParameterizableDescriptor { pub name: String, pub description: Option, @@ -445,20 +379,6 @@ pub trait Parameterizable { } } -impl Parameterizable for T { - fn describe_parameters() -> ParametersDescriptor { ParametersDescriptor::default() } - fn from_parameters( - _parameters: Parameters, - _is_input_to: &[NodeID], - _context: &ProcessingContext, - ) -> Result - where - Self: Sized, - { - Ok(Self::default()) - } -} - #[allow(non_snake_case)] pub mod prelude { @@ -471,7 +391,10 @@ pub mod prelude { Parameters, ParametersDescriptor, }; - pub fn Flag() -> ParameterTypeDescriptor { WithDefault(BoolParameter, BoolValue(false)) } + + pub fn Optional(ty: ParameterType) -> ParameterTypeDescriptor { + WithDefault(ty.clone(), ty.default_value()) + } pub fn NaturalWithZero() -> ParameterType { IntRange(0, i64::MAX) } pub fn NaturalGreaterZero() -> ParameterType { IntRange(1, i64::MAX) } pub fn U8() -> ParameterType { IntRange(0, u8::MAX as i64) } diff --git a/src/pipeline_processing/processing_context.rs b/src/pipeline_processing/processing_context.rs index d1f4c37..b02ca39 100644 --- a/src/pipeline_processing/processing_context.rs +++ b/src/pipeline_processing/processing_context.rs @@ -1,6 +1,6 @@ use crate::pipeline_processing::{ buffers::{CpuBuffer, GpuBuffer}, - frame::Frame, + frame::{Frame, Raw, Rgb, Rgba, SZ3Compressed}, payload::Payload, prioritized_executor::PrioritizedReactor, }; @@ -192,7 +192,19 @@ impl ProcessingContext { CpuBuffer::Vec(Arc::new(RwLock::new(vec))) } } - fn to_cpu_buffer_frame(&self, frame: Arc>) -> Result> { + pub fn get_init_cpu_buffer(&self, len: usize, init: u8) -> CpuBuffer { + unsafe { + let mut buf = self.get_uninit_cpu_buffer(len); + buf.as_mut_slice(|buf| { + buf.iter_mut().for_each(|v| *v = init); + }); + buf + } + } + fn to_cpu_buffer( + &self, + frame: Arc>, + ) -> Result> { let (device, queues) = self.require_vulkan()?; let queue = queues.iter().find(|&q| q.family().explicitly_supports_transfers()).unwrap().clone(); @@ -217,21 +229,44 @@ impl ProcessingContext { // dropping this future blocks this thread until the gpu finished the work drop(future); - Ok(Frame { interpretation: frame.interpretation.clone(), storage: buffer }) + Ok(Frame { interp: frame.interp.clone(), storage: buffer }) } - pub fn ensure_cpu_buffer_frame(&self, payload: &Payload) -> Result>> { - if let Ok(frame) = payload.downcast::>() { + pub fn ensure_cpu_buffer( + &self, + payload: &Payload, + ) -> anyhow::Result>> { + if let Ok(frame) = payload.downcast::>() { Ok(frame) - } else if let Ok(frame) = payload.downcast::>() { - Ok(Arc::new(self.to_cpu_buffer_frame(frame)?)) + } else if let Ok(frame) = payload.downcast::>() { + Ok(Arc::new(self.to_cpu_buffer(frame)?)) } else { Err(anyhow!( - "wanted a frame with type {}, but the payload was of type {}", - std::any::type_name::>(), + "wanted a frame with interpretation {}, but the payload was of type {}", + std::any::type_name::(), payload.type_name )) } } + pub fn ensure_any_cpu_buffer(&self, payload: &Payload) -> anyhow::Result { + macro_rules! conv { + ($($ty:ty),*) => { + $( + if let Ok(frame) = payload.downcast::>() { + return Ok(frame.storage.clone()); + } else if let Ok(frame) = payload.downcast::>() { + return Ok(self.to_cpu_buffer(frame)?.storage); + } + )* + }; + } + conv!(Raw, Rgb, Rgba, SZ3Compressed); + + return Err(anyhow!( + "wanted to convert frame {} to a byte array, but this was not possible", + payload.type_name + )); + } + pub fn require_vulkan(&self) -> Result<(Arc, Vec>)> { if let Some(vulkan_context) = &self.vulkan_device { Ok((vulkan_context.device.clone(), vulkan_context.queues.clone())) diff --git a/src/pipeline_processing/puller.rs b/src/pipeline_processing/puller.rs index bbfb54f..75d846d 100644 --- a/src/pipeline_processing/puller.rs +++ b/src/pipeline_processing/puller.rs @@ -24,14 +24,14 @@ pub async fn pull_unordered( output_priority: u8, progress_callback: Arc, input: InputProcessingNode, - number_of_frames: Option, + number_of_frames: u64, on_payload: impl Fn(Payload, u64) -> Result<()> + Send + Sync + Clone + 'static, ) -> Result<()> { let mut range = match (number_of_frames, input.get_caps().frame_count) { - (None, None) => 0..u64::MAX_VALUE, - (None, Some(n)) => 0..n, - (Some(n), None) => 0..n, - (Some(n), Some(m)) => 0..n.min(m), + (0, None) => 0..u64::MAX_VALUE, + (0, Some(n)) => 0..n, + (n, None) => 0..n, + (n, Some(m)) => 0..n.min(m), }; let total_frames = if range.end == u64::MAX_VALUE { None } else { Some(range.end as _) }; @@ -99,13 +99,13 @@ pub fn pull_ordered( output_priority: u8, progress_callback: Arc, input: InputProcessingNode, - number_of_frames: Option, + number_of_frames: u64, ) -> flume::Receiver { let mut range = match (number_of_frames, input.get_caps().frame_count) { - (None, None) => 0..u64::MAX_VALUE, - (None, Some(n)) => 0..n, - (Some(n), None) => 0..n, - (Some(n), Some(m)) => 0..n.min(m), + (0, None) => 0..u64::MAX_VALUE, + (0, Some(n)) => 0..n, + (n, None) => 0..n, + (n, Some(m)) => 0..n.min(m), }; let total_frames = if range.end == u64::MAX_VALUE { None } else { Some(range.end as _) };