diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index df4d7c2753ef..05207d094a25 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -21,7 +21,6 @@ use crate::{VariantArray, VariantArrayBuilder}; use arrow::array::{Array, ArrayRef, StringArray}; use arrow_schema::ArrowError; -use parquet_variant::VariantBuilder; use parquet_variant_json::json_to_variant; /// Parse a batch of JSON strings into a batch of Variants represented as @@ -41,10 +40,10 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result VariantArrayVariantBuilder { + // append directly into the metadata and value buffers + let metadata_buffer = std::mem::take(&mut self.metadata_buffer); + let value_buffer = std::mem::take(&mut self.value_buffer); + VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer) + } +} + +/// A `VariantBuilder` that writes directly to the buffers of a `VariantArrayBuilder`. +/// +/// Note this struct implements [`VariantBuilderExt`], so it can be used +/// as a drop-in replacement for [`VariantBuilder`] in most cases. +/// +/// If [`Self::finish`] is not called, any changes will be rolled back +/// +/// See [`VariantArrayBuilder::variant_builder`] for an example +pub struct VariantArrayVariantBuilder<'a> { + /// was finish called? + finished: bool, + /// starting offset in the variant_builder's `metadata` buffer + metadata_offset: usize, + /// starting offset in the variant_builder's `value` buffer + value_offset: usize, + /// Parent array builder that this variant builder writes to. Buffers + /// have been moved into the variant builder, and must be returned on + /// drop + array_builder: &'a mut VariantArrayBuilder, + /// Builder for the in progress variant value, temporarily owns the buffers + /// from `array_builder` + variant_builder: VariantBuilder, +} + +impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> { + fn append_value<'m, 'v>(&mut self, value: impl Into>) { + self.variant_builder.append_value(value); + } + + fn new_list(&mut self) -> ListBuilder { + self.variant_builder.new_list() + } + + fn new_object(&mut self) -> ObjectBuilder { + self.variant_builder.new_object() + } +} + +impl<'a> VariantArrayVariantBuilder<'a> { + /// Constructs a new VariantArrayVariantBuilder + /// + /// Note this is not public as this is a structure that is logically + /// part of the [`VariantArrayBuilder`] and relies on its internal structure + fn new( + array_builder: &'a mut VariantArrayBuilder, + metadata_buffer: Vec, + value_buffer: Vec, + ) -> Self { + let metadata_offset = metadata_buffer.len(); + let value_offset = value_buffer.len(); + VariantArrayVariantBuilder { + finished: false, + metadata_offset, + value_offset, + variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer), + array_builder, + } + } + + /// Return a reference to the underlying `VariantBuilder` + pub fn inner(&self) -> &VariantBuilder { + &self.variant_builder + } + + /// Return a mutable reference to the underlying `VariantBuilder` + pub fn inner_mut(&mut self) -> &mut VariantBuilder { + &mut self.variant_builder + } + + /// Called to finish the in progress variant and write it to the underlying + /// buffers + /// + /// Note if you do not call finish, on drop any changes made to the + /// underlying buffers will be rolled back. + pub fn finish(mut self) { + self.finished = true; + // Note: buffers are returned and replaced in the drop impl + } +} + +impl<'a> Drop for VariantArrayVariantBuilder<'a> { + /// If the builder was not finished, roll back any changes made to the + /// underlying buffers (by truncating them) + fn drop(&mut self) { + let metadata_offset = self.metadata_offset; + let value_offset = self.value_offset; + + // get the buffers back from the variant builder + let (mut metadata_buffer, mut value_buffer) = + std::mem::take(&mut self.variant_builder).finish(); + + // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) + let metadata_len = metadata_buffer + .len() + .checked_sub(metadata_offset) + .expect("metadata length decreased unexpectedly"); + let value_len = value_buffer + .len() + .checked_sub(value_offset) + .expect("value length decreased unexpectedly"); + + if self.finished { + // if the object was finished, commit the changes by putting the + // offsets and lengths into the parent array builder. + self.array_builder + .metadata_locations + .push((metadata_offset, metadata_len)); + self.array_builder + .value_locations + .push((value_offset, value_len)); + self.array_builder.nulls.append_non_null(); + } else { + // if the object was not finished, truncate the buffers to the + // original offsets to roll back any changes. Note this is fast + // because truncate doesn't free any memory: it just has to drop + // elements (and u8 doesn't have a destructor) + metadata_buffer.truncate(metadata_offset); + value_buffer.truncate(value_offset); + } + + // put the buffers back into the array builder + self.array_builder.metadata_buffer = metadata_buffer; + self.array_builder.value_buffer = value_buffer; + } } fn binary_view_array_from_buffers( @@ -220,4 +390,91 @@ mod test { ); } } + + /// Test using sub builders to append variants + #[test] + fn test_variant_array_builder_variant_builder() { + let mut builder = VariantArrayBuilder::new(10); + builder.append_null(); // should not panic + builder.append_variant(Variant::from(42i32)); + + // let's make a sub-object in the next row + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_object() + .with_field("foo", "bar") + .finish() + .unwrap(); + sub_builder.finish(); // must call finish to write the variant to the buffers + + // append a new list + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_list() + .with_value(Variant::from(1i32)) + .with_value(Variant::from(2i32)) + .finish(); + sub_builder.finish(); + let variant_array = builder.build(); + + assert_eq!(variant_array.len(), 4); + assert!(variant_array.is_null(0)); + assert!(!variant_array.is_null(1)); + assert_eq!(variant_array.value(1), Variant::from(42i32)); + assert!(!variant_array.is_null(2)); + let variant = variant_array.value(2); + let variant = variant.as_object().expect("variant to be an object"); + assert_eq!(variant.get("foo").unwrap(), Variant::from("bar")); + assert!(!variant_array.is_null(3)); + let variant = variant_array.value(3); + let list = variant.as_list().expect("variant to be a list"); + assert_eq!(list.len(), 2); + } + + /// Test using non-finished sub builders to append variants + #[test] + fn test_variant_array_builder_variant_builder_reset() { + let mut builder = VariantArrayBuilder::new(10); + + // make a sub-object in the first row + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_object() + .with_field("foo", 1i32) + .finish() + .unwrap(); + sub_builder.finish(); // must call finish to write the variant to the buffers + + // start appending an object but don't finish + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_object() + .with_field("bar", 2i32) + .finish() + .unwrap(); + drop(sub_builder); // drop the sub builder without finishing it + + // make a third sub-object (this should reset the previous unfinished object) + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_object() + .with_field("baz", 3i32) + .finish() + .unwrap(); + sub_builder.finish(); // must call finish to write the variant to the buffers + + let variant_array = builder.build(); + + // only the two finished objects should be present + assert_eq!(variant_array.len(), 2); + assert!(!variant_array.is_null(0)); + let variant = variant_array.value(0); + let variant = variant.as_object().expect("variant to be an object"); + assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32)); + + assert!(!variant_array.is_null(1)); + let variant = variant_array.value(1); + let variant = variant.as_object().expect("variant to be an object"); + assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32)); + } } diff --git a/parquet-variant-json/src/from_json.rs b/parquet-variant-json/src/from_json.rs index 3052bc504dee..355bd8008777 100644 --- a/parquet-variant-json/src/from_json.rs +++ b/parquet-variant-json/src/from_json.rs @@ -18,22 +18,28 @@ //! Module for parsing JSON strings as Variant use arrow_schema::ArrowError; -use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; +use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilderExt}; use serde_json::{Number, Value}; -/// Converts a JSON string to Variant using [`VariantBuilder`]. The resulting `value` and `metadata` -/// buffers can be extracted using `builder.finish()` +/// Converts a JSON string to Variant to a [`VariantBuilderExt`], such as +/// [`VariantBuilder`]. +/// +/// The resulting `value` and `metadata` buffers can be +/// extracted using `builder.finish()` /// /// # Arguments /// * `json` - The JSON string to parse as Variant. /// * `variant_builder` - Object of type `VariantBuilder` used to build the vatiant from the JSON /// string /// +/// /// # Returns /// /// * `Ok(())` if successful /// * `Err` with error details if the conversion fails /// +/// [`VariantBuilder`]: parquet_variant::VariantBuilder +/// /// ```rust /// # use parquet_variant::VariantBuilder; /// # use parquet_variant_json::{ @@ -62,7 +68,7 @@ use serde_json::{Number, Value}; /// assert_eq!(json_result, serde_json::to_string(&json_value)?); /// # Ok::<(), Box>(()) /// ``` -pub fn json_to_variant(json: &str, builder: &mut VariantBuilder) -> Result<(), ArrowError> { +pub fn json_to_variant(json: &str, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> { let json: Value = serde_json::from_str(json) .map_err(|e| ArrowError::InvalidArgumentError(format!("JSON format error: {e}")))?; @@ -70,7 +76,7 @@ pub fn json_to_variant(json: &str, builder: &mut VariantBuilder) -> Result<(), A Ok(()) } -fn build_json(json: &Value, builder: &mut VariantBuilder) -> Result<(), ArrowError> { +fn build_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> { append_json(json, builder)?; Ok(()) } @@ -99,10 +105,7 @@ fn variant_from_number<'m, 'v>(n: &Number) -> Result, ArrowError } } -fn append_json<'m, 'v>( - json: &'v Value, - builder: &mut impl VariantBuilderExt<'m, 'v>, -) -> Result<(), ArrowError> { +fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> { match json { Value::Null => builder.append_value(Variant::Null), Value::Bool(b) => builder.append_value(*b), @@ -137,8 +140,8 @@ struct ObjectFieldBuilder<'o, 'v, 's> { builder: &'o mut ObjectBuilder<'v>, } -impl<'m, 'v> VariantBuilderExt<'m, 'v> for ObjectFieldBuilder<'_, '_, '_> { - fn append_value(&mut self, value: impl Into>) { +impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> { + fn append_value<'m, 'v>(&mut self, value: impl Into>) { self.builder.insert(self.key, value); } diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index d0eb4872e442..800eec2013b8 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1324,16 +1324,16 @@ impl Drop for ObjectBuilder<'_> { /// /// Allows users to append values to a [`VariantBuilder`], [`ListBuilder`] or /// [`ObjectBuilder`]. using the same interface. -pub trait VariantBuilderExt<'m, 'v> { - fn append_value(&mut self, value: impl Into>); +pub trait VariantBuilderExt { + fn append_value<'m, 'v>(&mut self, value: impl Into>); fn new_list(&mut self) -> ListBuilder; fn new_object(&mut self) -> ObjectBuilder; } -impl<'m, 'v> VariantBuilderExt<'m, 'v> for ListBuilder<'_> { - fn append_value(&mut self, value: impl Into>) { +impl VariantBuilderExt for ListBuilder<'_> { + fn append_value<'m, 'v>(&mut self, value: impl Into>) { self.append_value(value); } @@ -1346,8 +1346,8 @@ impl<'m, 'v> VariantBuilderExt<'m, 'v> for ListBuilder<'_> { } } -impl<'m, 'v> VariantBuilderExt<'m, 'v> for VariantBuilder { - fn append_value(&mut self, value: impl Into>) { +impl VariantBuilderExt for VariantBuilder { + fn append_value<'m, 'v>(&mut self, value: impl Into>) { self.append_value(value); }