diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 9f72000b4..0b1eeb6f6 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -293,9 +293,7 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use actix_web::test::TestRequest; - use arrow_array::{ - types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray, - }; + use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray}; use arrow_schema::{DataType, Field}; use serde_json::json; @@ -689,25 +687,14 @@ mod tests { ]) ); - let c_a = vec![None, None, Some(vec![Some(1i64)]), Some(vec![Some(1)])]; - let c_b = vec![None, None, None, Some(vec![Some(2i64)])]; - assert_eq!( - rb.column_by_name("c_a") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(c_a) + rb.column_by_name("c_a").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, None, Some(1), Some(1)]) ); assert_eq!( - rb.column_by_name("c_b") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(c_b) + rb.column_by_name("c_b").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, None, None, Some(2)]) ); } } diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 6e84ab0b7..43d12482d 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -304,6 +304,67 @@ pub fn flatten_array_objects( Ok(()) } +pub fn flatten_json(value: &Value) -> Vec { + match value { + Value::Array(arr) => { + let mut results = Vec::new(); + for item in arr { + results.extend(flatten_json(item)); + } + results + } + Value::Object(map) => { + let mut results = vec![map.clone()]; + for (key, val) in map { + if matches!(val, Value::Array(_)) { + if let Value::Array(arr) = val { + let mut new_results = Vec::new(); + for item in arr { + let flattened_items = flatten_json(item); + for flattened_item in flattened_items { + for result in &results { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), flattened_item.clone()); + new_results.push(new_obj); + } + } + } + results = new_results; + } + } else if matches!(val, Value::Object(_)) { + let nested_results = flatten_json(val); + let mut new_results = Vec::new(); + for nested_result in nested_results { + for result in &results { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), nested_result.clone()); + new_results.push(new_obj); + } + } + results = new_results; + } + } + results.into_iter().map(Value::Object).collect() + } + _ => vec![value.clone()], + } +} + +pub fn convert_to_array(flattened: Vec) -> Result { + let mut result = Vec::new(); + for item in flattened { + let mut map = Map::new(); + if let Some(item) = item.as_object() { + for (key, value) in item { + map.insert(key.clone(), value.clone()); + } + result.push(Value::Object(map)); + } else { + return Err(anyhow!("Expected object in array of objects")); + } + } + Ok(Value::Array(result)) +} #[cfg(test)] mod tests { use crate::utils::json::flatten::flatten_array_objects; diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 526fb532f..263a951e5 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -28,14 +28,17 @@ pub fn flatten_json_body( custom_partition: Option, validation_required: bool, ) -> Result { - flatten::flatten( - body, - "_", - time_partition, - time_partition_limit, - custom_partition, - validation_required, - ) + match flatten::convert_to_array(flatten::flatten_json(&body)) { + Ok(nested_value) => flatten::flatten( + nested_value, + "_", + time_partition, + time_partition_limit, + custom_partition, + validation_required, + ), + Err(err) => Err(err), + } } pub fn convert_array_to_object(