From 82adac4a4f452def5efdbfcf53a2b4e676b7e5e9 Mon Sep 17 00:00:00 2001 From: jordanrfrazier <122494242+jordanrfrazier@users.noreply.github.com> Date: Thu, 27 Jul 2023 14:14:25 -0700 Subject: [PATCH] feat: `index` for DataType::List (#562) Adds type inference support for lists and adds just the `index` function. Further additions: https://github.com/kaskada-ai/kaskada/issues/494 --- .../src/kaskada/v1alpha/schema_traits.rs | 49 +++++- crates/sparrow-arrow/src/downcast.rs | 10 +- crates/sparrow-compiler/src/ast_to_dfg.rs | 15 ++ crates/sparrow-compiler/src/dfg.rs | 1 - .../src/functions/collection.rs | 5 + .../src/functions/pushdown.rs | 1 + .../src/plan/operation_to_plan.rs | 14 +- .../sparrow-compiler/src/types/inference.rs | 41 ++++- ...er_golden_tests__lookup_diff_grouping.snap | 14 +- ...er_golden_tests__lookup_same_grouping.snap | 14 +- ...ts__lookup_same_grouping_with_slicing.snap | 14 +- crates/sparrow-instructions/src/evaluators.rs | 3 + .../src/evaluators/list.rs | 2 + .../src/evaluators/list/index.rs | 141 ++++++++++++++++ crates/sparrow-main/tests/e2e/list_tests.rs | 154 ++++++++++++++++++ crates/sparrow-main/tests/e2e/main.rs | 1 + crates/sparrow-plan/src/inst.rs | 2 + .../src/execute/operation/lookup_request.rs | 12 +- .../src/execute/operation/lookup_response.rs | 7 +- .../src/execute/operation/spread.rs | 8 +- crates/sparrow-syntax/src/syntax/fenl_type.rs | 5 +- proto/kaskada/kaskada/v1alpha/schema.proto | 8 +- testdata/parquet/data_with_list.parquet | Bin 0 -> 2865 bytes 23 files changed, 470 insertions(+), 51 deletions(-) create mode 100644 crates/sparrow-instructions/src/evaluators/list.rs create mode 100644 crates/sparrow-instructions/src/evaluators/list/index.rs create mode 100644 crates/sparrow-main/tests/e2e/list_tests.rs create mode 100644 testdata/parquet/data_with_list.parquet diff --git a/crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs b/crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs index 762a95a99..e444bb23e 100644 --- a/crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs +++ b/crates/sparrow-api/src/kaskada/v1alpha/schema_traits.rs @@ -24,7 +24,7 @@ impl DataType { let value = &fields[1]; Self { kind: Some(data_type::Kind::Map(Box::new(data_type::Map { - name: name.to_string(), + name: name.to_owned(), ordered, key_name: key.name.clone(), key_type: Some(Box::new( @@ -44,6 +44,22 @@ impl DataType { } } + pub fn new_list(name: &str, field: schema::Field) -> Self { + Self { + kind: Some(data_type::Kind::List(Box::new(data_type::List { + name: name.to_owned(), + item_type: Some(Box::new( + field + .data_type + .as_ref() + .expect("data type to exist") + .clone(), + )), + nullable: field.nullable, + }))), + } + } + pub fn new_primitive(primitive: data_type::PrimitiveType) -> Self { Self { kind: Some(data_type::Kind::Primitive(primitive as i32)), @@ -221,6 +237,21 @@ impl TryFrom<&arrow::datatypes::DataType> for DataType { Ok(DataType::new_map(s.name(), *is_ordered, vec![key, value])) } + arrow::datatypes::DataType::List(field) => { + let name = field.name(); + let field = schema::Field { + name: name.to_owned(), + data_type: Some(field.data_type().try_into().map_err( + |err: ConversionError| { + err.with_prepend_field("list item".to_owned()) + }, + )?), + nullable: field.is_nullable(), + }; + + Ok(DataType::new_list(name, field)) + } + unsupported => Err(ConversionError::new_unsupported(unsupported.clone())), } } @@ -337,12 +368,16 @@ impl TryFrom<&DataType> for arrow::datatypes::DataType { Some(data_type::Kind::Struct(schema)) => Ok(arrow::datatypes::DataType::Struct( fields_to_arrow(&schema.fields)?.into(), )), - Some(data_type::Kind::List(item_type)) => { - let item_type = arrow::datatypes::DataType::try_from(item_type.as_ref()) - .map_err(|e| e.with_prepend_field("list item".to_owned()))?; - let item_type = arrow::datatypes::Field::new("item", item_type, true); - Ok(arrow::datatypes::DataType::List(Arc::new(item_type))) - } + Some(data_type::Kind::List(list)) => match list.item_type.as_ref() { + Some(item_type) => { + let item_type = arrow::datatypes::DataType::try_from(item_type.as_ref()) + .map_err(|e| e.with_prepend_field("list item".to_owned()))?; + let item_type = + arrow::datatypes::Field::new(list.name.clone(), item_type, list.nullable); + Ok(arrow::datatypes::DataType::List(Arc::new(item_type))) + } + None => Err(ConversionError::new_unsupported(value.clone())), + }, Some(data_type::Kind::Map(map)) => { match (map.key_type.as_ref(), map.value_type.as_ref()) { (Some(key), Some(value)) => { diff --git a/crates/sparrow-arrow/src/downcast.rs b/crates/sparrow-arrow/src/downcast.rs index ce0eabada..39b263ddc 100644 --- a/crates/sparrow-arrow/src/downcast.rs +++ b/crates/sparrow-arrow/src/downcast.rs @@ -2,8 +2,7 @@ use anyhow::Context; use arrow::array::{ - Array, BooleanArray, GenericStringArray, ListArray, OffsetSizeTrait, PrimitiveArray, - StructArray, + Array, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, StructArray, }; use arrow::datatypes::ArrowPrimitiveType; use arrow_array::MapArray; @@ -24,13 +23,6 @@ pub fn downcast_primitive_array( }) } -pub fn downcast_list_array(array: &dyn Array) -> anyhow::Result<&ListArray> { - array - .as_any() - .downcast_ref::() - .with_context(|| format!("Unable to downcast {:?} to ListArray", array.data_type())) -} - /// Downcast an array into a string array. pub fn downcast_string_array(array: &dyn Array) -> anyhow::Result<&GenericStringArray> where diff --git a/crates/sparrow-compiler/src/ast_to_dfg.rs b/crates/sparrow-compiler/src/ast_to_dfg.rs index 020f4be19..58b9f477a 100644 --- a/crates/sparrow-compiler/src/ast_to_dfg.rs +++ b/crates/sparrow-compiler/src/ast_to_dfg.rs @@ -674,6 +674,13 @@ fn cast_if_needed( { Ok(value) } + // Ensures that list types with the same inner types are compatible, regardless of the (arbitary) field naming. + (FenlType::Concrete(DataType::List(s)), FenlType::Concrete(DataType::List(s2))) + if list_types_are_equal(s, s2) => + { + Ok(value) + } + (FenlType::Concrete(DataType::Null), FenlType::Window) => Ok(value), ( FenlType::Concrete(DataType::Struct(actual_fields)), @@ -733,6 +740,14 @@ fn map_types_are_equal(a: &FieldRef, b: &FieldRef) -> bool { } } +// When constructing the concrete list during inference, we use arbitary names for the inner data +// field since we don't have access to the user's naming patterns there. +// By comparing the list types based on just the inner type, we can ensure that the types are +// still treated as equal. +fn list_types_are_equal(a: &FieldRef, b: &FieldRef) -> bool { + a.data_type() == b.data_type() +} + pub(crate) fn is_any_new(dfg: &mut Dfg, arguments: &[Located]) -> anyhow::Result { let mut argument_is_new = arguments.iter().map(|a| a.is_new()).unique(); let mut result = argument_is_new diff --git a/crates/sparrow-compiler/src/dfg.rs b/crates/sparrow-compiler/src/dfg.rs index 6c535e25b..a5fe9cf9b 100644 --- a/crates/sparrow-compiler/src/dfg.rs +++ b/crates/sparrow-compiler/src/dfg.rs @@ -108,7 +108,6 @@ impl Default for Dfg { impl Dfg { pub(super) fn add_literal(&mut self, literal: impl Into) -> anyhow::Result { let literal = literal.into(); - // TODO: FRAZ - do I need to support large string literal here? if let ScalarValue::Utf8(Some(literal)) = literal { self.add_string_literal(&literal) } else { diff --git a/crates/sparrow-compiler/src/functions/collection.rs b/crates/sparrow-compiler/src/functions/collection.rs index 2b5d29b4c..78f003c09 100644 --- a/crates/sparrow-compiler/src/functions/collection.rs +++ b/crates/sparrow-compiler/src/functions/collection.rs @@ -7,4 +7,9 @@ pub(super) fn register(registry: &mut Registry) { .register("get(key: K, map: map) -> V") .with_implementation(Implementation::Instruction(InstOp::Get)) .set_internal(); + + registry + .register("index(i: i64, list: list) -> T") + .with_implementation(Implementation::Instruction(InstOp::Index)) + .set_internal(); } diff --git a/crates/sparrow-compiler/src/functions/pushdown.rs b/crates/sparrow-compiler/src/functions/pushdown.rs index 7d9f1e4e6..589899334 100644 --- a/crates/sparrow-compiler/src/functions/pushdown.rs +++ b/crates/sparrow-compiler/src/functions/pushdown.rs @@ -111,6 +111,7 @@ impl Pushdown { | DataType::Interval(_) | DataType::Utf8 | DataType::LargeUtf8 + | DataType::List(..) | DataType::Map(..) => { let mut subst = subst.clone(); subst.insert( diff --git a/crates/sparrow-compiler/src/plan/operation_to_plan.rs b/crates/sparrow-compiler/src/plan/operation_to_plan.rs index 0d5de71de..6ba346a24 100644 --- a/crates/sparrow-compiler/src/plan/operation_to_plan.rs +++ b/crates/sparrow-compiler/src/plan/operation_to_plan.rs @@ -21,10 +21,16 @@ use crate::DataContext; /// DataType protobuf representing a list of u64. #[static_init::dynamic] static LIST_U64_DATA_TYPE: DataType = DataType { - kind: Some(data_type::Kind::List(Box::new(DataType { - kind: Some(data_type::Kind::Primitive( - data_type::PrimitiveType::U64 as i32, - )), + kind: Some(data_type::Kind::List(Box::new(data_type::List { + // Note: The fields here must match the default fields used when creating + // types during type inference, otherwise schema validation will fail. + name: "item".to_owned(), + item_type: Some(Box::new(DataType { + kind: Some(data_type::Kind::Primitive( + data_type::PrimitiveType::U64 as i32, + )), + })), + nullable: true, }))), }; diff --git a/crates/sparrow-compiler/src/types/inference.rs b/crates/sparrow-compiler/src/types/inference.rs index 38d4e7d8b..d64c02331 100644 --- a/crates/sparrow-compiler/src/types/inference.rs +++ b/crates/sparrow-compiler/src/types/inference.rs @@ -210,8 +210,30 @@ pub fn validate_instantiation( } } } - FenlType::Collection(Collection::List, _) => { - todo!("list unsupported") + FenlType::Collection(Collection::List, type_vars) => { + debug_assert!(type_vars.len() == 1); + let item_type = match argument_type { + FenlType::Concrete(DataType::List(f)) => { + FenlType::Concrete(f.data_type().clone()) + } + other => anyhow::bail!("expected list, saw {:?}", other), + }; + + match types_for_variable.entry(type_vars[0].clone()) { + Entry::Occupied(occupied) => { + anyhow::ensure!( + occupied.get() == &item_type + || matches!(occupied.get(), FenlType::Error) + || matches!(argument_type, FenlType::Error), + "Failed type validation: expected {} but was {}", + occupied.get(), + item_type + ); + } + Entry::Vacant(vacant) => { + vacant.insert(item_type.clone()); + } + } } FenlType::Error => { // Assume the argument matches (since we already reported what @@ -359,7 +381,20 @@ fn instantiate_type(fenl_type: &FenlType, solutions: &HashMap todo!("unsupported"), + FenlType::Collection(Collection::List, type_vars) => { + debug_assert!(type_vars.len() == 1); + let concrete_type = solutions + .get(&type_vars[0]) + .cloned() + .unwrap_or(FenlType::Concrete(DataType::Null)); + let field = match concrete_type { + // TODO: Should the fields be nullable? + FenlType::Concrete(t) => Arc::new(Field::new("item", t, false)), + other => panic!("expected concrete type, got {:?}", other), + }; + + FenlType::Concrete(DataType::List(field)) + } FenlType::Concrete(_) => fenl_type.clone(), FenlType::Window => fenl_type.clone(), FenlType::Json => fenl_type.clone(), diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_diff_grouping.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_diff_grouping.snap index 87f4a49cf..88330bdf2 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_diff_grouping.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_diff_grouping.snap @@ -227,8 +227,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: @@ -251,8 +254,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping.snap index d9f029d3a..9970a70fb 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping.snap @@ -217,8 +217,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: @@ -241,8 +244,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: diff --git a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping_with_slicing.snap b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping_with_slicing.snap index c1d5b07ba..72ae1e7ef 100644 --- a/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping_with_slicing.snap +++ b/crates/sparrow-compiler/tests/snapshots/compiler_golden_tests__lookup_same_grouping_with_slicing.snap @@ -219,8 +219,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: @@ -243,8 +246,11 @@ operations: result_type: kind: List: - kind: - Primitive: 10 + name: item + item_type: + kind: + Primitive: 10 + nullable: true output: true operator: Input: diff --git a/crates/sparrow-instructions/src/evaluators.rs b/crates/sparrow-instructions/src/evaluators.rs index 84276df3f..770e923cf 100644 --- a/crates/sparrow-instructions/src/evaluators.rs +++ b/crates/sparrow-instructions/src/evaluators.rs @@ -16,6 +16,7 @@ mod equality; mod field_ref; mod general; mod json_field; +mod list; mod logical; mod macros; mod map; @@ -31,6 +32,7 @@ use equality::*; use field_ref::*; use general::*; use json_field::*; +use list::*; use logical::*; use map::*; use math::*; @@ -204,6 +206,7 @@ fn create_simple_evaluator( } InstOp::Floor => FloorEvaluator::try_new(info), InstOp::Get => GetEvaluator::try_new(info), + InstOp::Index => IndexEvaluator::try_new(info), InstOp::Gt => match (info.args[0].is_literal(), info.args[1].is_literal()) { (_, true) => { create_ordered_evaluator!(&info.args[0].data_type, GtScalarEvaluator, info) diff --git a/crates/sparrow-instructions/src/evaluators/list.rs b/crates/sparrow-instructions/src/evaluators/list.rs new file mode 100644 index 000000000..c1c37f4f9 --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list.rs @@ -0,0 +1,2 @@ +mod index; +pub(super) use index::*; diff --git a/crates/sparrow-instructions/src/evaluators/list/index.rs b/crates/sparrow-instructions/src/evaluators/list/index.rs new file mode 100644 index 000000000..718b6ddb6 --- /dev/null +++ b/crates/sparrow-instructions/src/evaluators/list/index.rs @@ -0,0 +1,141 @@ +use anyhow::Context; +use arrow::array::{Array, ArrayRef, AsArray, Int32Array, Int64Array, ListArray}; + +use arrow_schema::DataType; +use itertools::Itertools; +use sparrow_plan::ValueRef; +use std::sync::Arc; + +use crate::{Evaluator, EvaluatorFactory, StaticInfo}; + +/// Evaluator for `index` on lists. +/// +/// Retrieves the value at the given index. +#[derive(Debug)] +pub(in crate::evaluators) struct IndexEvaluator { + index: ValueRef, + list: ValueRef, +} + +impl EvaluatorFactory for IndexEvaluator { + fn try_new(info: StaticInfo<'_>) -> anyhow::Result> { + let input_type = info.args[1].data_type.clone(); + match input_type { + DataType::List(t) => anyhow::ensure!(t.data_type() == info.result_type), + other => anyhow::bail!("expected list type, saw {:?}", other), + }; + + let (index, list) = info.unpack_arguments()?; + Ok(Box::new(Self { index, list })) + } +} + +impl Evaluator for IndexEvaluator { + fn evaluate(&mut self, info: &dyn crate::RuntimeInfo) -> anyhow::Result { + let list_input = info.value(&self.list)?.array_ref()?; + let index_input = info.value(&self.index)?.primitive_array()?; + + let result = list_get(&list_input, &index_input)?; + Ok(Arc::new(result)) + } +} + +/// Given a `ListArray` and `index` array of the same length return an array of the values. +fn list_get(list: &ArrayRef, indices: &Int64Array) -> anyhow::Result { + anyhow::ensure!(list.len() == indices.len()); + + let list = list.as_list(); + let take_indices = list_indices(list, indices)?; + arrow::compute::take(list.values(), &take_indices, None).context("take in get_map") +} + +/// Gets the indices in the list where the values are at the index within each list. +fn list_indices(list: &ListArray, indices: &Int64Array) -> anyhow::Result { + let offsets = list.offsets(); + + let mut result = Int32Array::builder(indices.len()); + let offsets = offsets.iter().map(|n| *n as usize).tuple_windows(); + + 'outer: for (index, (start, next)) in offsets.enumerate() { + let list_start = 0; + let list_end = next - start; + if indices.is_valid(index) { + // The inner index corresponds to the index within each list. + let inner_index = indices.value(index) as usize; + // The outer index corresponds to the index with the flattened array. + let outer_index = start + inner_index; + if inner_index >= list_start && inner_index < list_end { + result.append_value(outer_index as i32); + continue 'outer; + } + } + result.append_null(); + } + + Ok(result.finish()) +} + +#[cfg(test)] +mod tests { + use crate::evaluators::list::index::list_get; + use arrow::array::{ + as_boolean_array, as_primitive_array, as_string_array, ArrayRef, BooleanArray, + BooleanBuilder, Int32Array, Int32Builder, Int64Array, ListBuilder, StringArray, + StringBuilder, + }; + use std::sync::Arc; + + #[test] + fn test_index_primitive() { + let mut builder = ListBuilder::new(Int32Builder::new()); + builder.append_value([Some(1), Some(2), Some(3)]); + builder.append_value([]); + builder.append_value([None]); + builder.append_value([Some(10), Some(8), Some(4)]); + builder.append_value([Some(10), Some(15), Some(19), Some(123)]); + + let array: ArrayRef = Arc::new(builder.finish()); + + let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); + let actual = list_get(&array, &indices).unwrap(); + let actual: &Int32Array = as_primitive_array(actual.as_ref()); + let expected = Int32Array::from(vec![Some(1), None, None, Some(10), Some(15)]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_index_string() { + let mut builder = ListBuilder::new(StringBuilder::new()); + builder.append_value([Some("hello"), None, Some("world")]); + builder.append_value([Some("apple")]); + builder.append_value([None, Some("carrot")]); + builder.append_value([None, Some("dog"), Some("cat")]); + builder.append_value([Some("bird"), Some("fish")]); + + let array: ArrayRef = Arc::new(builder.finish()); + + let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); + let actual = list_get(&array, &indices).unwrap(); + let actual: &StringArray = as_string_array(actual.as_ref()); + let expected = StringArray::from(vec![Some("hello"), None, None, None, Some("fish")]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_index_boolean() { + let mut builder = ListBuilder::new(BooleanBuilder::new()); + builder.append_value([Some(true), None, Some(false)]); + builder.append_value([Some(false)]); + builder.append_value([None, Some(false)]); + builder.append_value([None, Some(true), Some(false)]); + builder.append_value([Some(true), Some(false)]); + + let array: ArrayRef = Arc::new(builder.finish()); + + let indices = Int64Array::from(vec![0, 1, 2, 0, 1]); + let actual = list_get(&array, &indices).unwrap(); + let actual: &BooleanArray = as_boolean_array(actual.as_ref()); + let expected = BooleanArray::from(vec![Some(true), None, None, None, Some(false)]); + assert_eq!(actual, &expected); + } +} diff --git a/crates/sparrow-main/tests/e2e/list_tests.rs b/crates/sparrow-main/tests/e2e/list_tests.rs new file mode 100644 index 000000000..13d0921e4 --- /dev/null +++ b/crates/sparrow-main/tests/e2e/list_tests.rs @@ -0,0 +1,154 @@ +//! e2e tests for list types + +use sparrow_api::kaskada::v1alpha::TableConfig; +use uuid::Uuid; + +use crate::{fixture::DataFixture, QueryFixture}; + +/// Create a simple table with a collection type (map). +pub(crate) async fn list_data_fixture() -> DataFixture { + DataFixture::new() + .with_table_from_files( + TableConfig::new_with_table_source( + "Input", + &Uuid::new_v4(), + "time", + Some("subsort"), + "key", + "", + ), + &["parquet/data_with_list.parquet"], + ) + .await + .unwrap() +} + +#[tokio::test] +async fn test_index_list_i64_static() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(1) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,2 + "###); +} + +#[tokio::test] +async fn test_index_list_i64_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(Input.index) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,1 + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,3 + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,2 + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,3 + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,1 + "###); +} + +#[tokio::test] +async fn test_index_list_string_static() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.string_list | index(1) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,bird + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,bird + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1, + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,cat + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1, + "###); +} + +#[tokio::test] +async fn test_index_list_string_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.string_list | index(Input.index) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,dog + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,fish + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1, + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1, + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,dog + "###); +} + +#[tokio::test] +async fn test_index_list_bool_static() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.bool_list | index(1) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,true + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1,false + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,false + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,false + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1, + "###); +} + +#[tokio::test] +async fn test_index_list_bool_dynamic() { + insta::assert_snapshot!(QueryFixture::new("{ f1: Input.bool_list | index(Input.index) }").run_to_csv(&list_data_fixture().await).await.unwrap(), @r###" + _time,_subsort,_key_hash,_key,f1 + 1996-12-19T16:39:57.000000000,0,18433805721903975440,1,false + 1996-12-19T16:40:57.000000000,0,18433805721903975440,1, + 1996-12-19T16:40:59.000000000,0,18433805721903975440,1,false + 1996-12-19T16:41:57.000000000,0,18433805721903975440,1,true + 1996-12-19T16:42:57.000000000,0,18433805721903975440,1,true + "###); +} + +#[tokio::test] +async fn test_incorrect_index_type() { + insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(\"s\") }") + .run_to_csv(&list_data_fixture().await).await.unwrap_err(), @r###" + --- + code: Client specified an invalid argument + message: 1 errors in Fenl statements; see diagnostics + fenl_diagnostics: + - severity: error + code: E0010 + message: Invalid argument type(s) + formatted: + - "error[E0010]: Invalid argument type(s)" + - " --> Query:1:24" + - " |" + - "1 | { f1: Input.i64_list | index(\"s\") }" + - " | ^^^^^ --- Actual type: string" + - " | | " + - " | Invalid types for parameter 'i' in call to 'index'" + - " |" + - " --> built-in signature 'index(i: i64, list: list) -> T':1:18" + - " |" + - "1 | index(i: i64, list: list) -> T" + - " | --- Expected type: i64" + - "" + - "" + "###); +} + +#[tokio::test] +async fn test_incorrect_index_type_field() { + insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: Input.i64_list | index(Input.bool_list) }") + .run_to_csv(&list_data_fixture().await).await.unwrap_err(), @r###" + --- + code: Client specified an invalid argument + message: 1 errors in Fenl statements; see diagnostics + fenl_diagnostics: + - severity: error + code: E0010 + message: Invalid argument type(s) + formatted: + - "error[E0010]: Invalid argument type(s)" + - " --> Query:1:24" + - " |" + - "1 | { f1: Input.i64_list | index(Input.bool_list) }" + - " | ^^^^^ --------------- Actual type: list" + - " | | " + - " | Invalid types for parameter 'i' in call to 'index'" + - " |" + - " --> built-in signature 'index(i: i64, list: list) -> T':1:18" + - " |" + - "1 | index(i: i64, list: list) -> T" + - " | --- Expected type: i64" + - "" + - "" + "###); +} diff --git a/crates/sparrow-main/tests/e2e/main.rs b/crates/sparrow-main/tests/e2e/main.rs index c629a920e..84ecde202 100644 --- a/crates/sparrow-main/tests/e2e/main.rs +++ b/crates/sparrow-main/tests/e2e/main.rs @@ -26,6 +26,7 @@ mod equality_tests; mod formula_tests; mod general_tests; mod json_tests; +mod list_tests; mod logical_tests; mod lookup_tests; mod map_tests; diff --git a/crates/sparrow-plan/src/inst.rs b/crates/sparrow-plan/src/inst.rs index 5bf2e9fb4..5fcd8ad53 100644 --- a/crates/sparrow-plan/src/inst.rs +++ b/crates/sparrow-plan/src/inst.rs @@ -103,6 +103,8 @@ pub enum InstOp { Hash, #[strum(props(signature = "if(condition: bool, value: T) -> T"))] If, + #[strum(props(signature = "index(i: i64, list: list) -> T"))] + Index, #[strum(props(signature = "is_valid(input: T) -> bool"))] IsValid, // HACK: This instruction does not show up in the plan/does not have an evaluator. diff --git a/crates/sparrow-runtime/src/execute/operation/lookup_request.rs b/crates/sparrow-runtime/src/execute/operation/lookup_request.rs index 3fc5bc673..af8010446 100644 --- a/crates/sparrow-runtime/src/execute/operation/lookup_request.rs +++ b/crates/sparrow-runtime/src/execute/operation/lookup_request.rs @@ -346,10 +346,14 @@ mod tests { expressions: vec![ExpressionPlan { arguments: vec![], result_type: Some(v1alpha::DataType { - kind: Some(data_type::Kind::List(Box::new(v1alpha::DataType { - kind: Some(data_type::Kind::Primitive( - data_type::PrimitiveType::U64 as i32, - )), + kind: Some(data_type::Kind::List(Box::new(data_type::List { + name: "item".to_owned(), + item_type: Some(Box::new(v1alpha::DataType { + kind: Some(data_type::Kind::Primitive( + data_type::PrimitiveType::U64 as i32, + )), + })), + nullable: true, }))), }), output: true, diff --git a/crates/sparrow-runtime/src/execute/operation/lookup_response.rs b/crates/sparrow-runtime/src/execute/operation/lookup_response.rs index f0b9949d2..854c7709a 100644 --- a/crates/sparrow-runtime/src/execute/operation/lookup_response.rs +++ b/crates/sparrow-runtime/src/execute/operation/lookup_response.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use anyhow::Context; -use arrow::array::{Array, ListArray, UInt32Array, UInt64Array}; +use arrow::array::{Array, AsArray, ListArray, UInt32Array, UInt64Array}; use async_trait::async_trait; use error_stack::{IntoReport, IntoReportCompat, ResultExt}; use futures::StreamExt; use itertools::Itertools; use sparrow_api::kaskada::v1alpha::operation_plan; -use sparrow_arrow::downcast::{downcast_list_array, downcast_primitive_array}; +use sparrow_arrow::downcast::downcast_primitive_array; use sparrow_instructions::ComputeStore; use tokio_stream::wrappers::ReceiverStream; @@ -111,8 +111,7 @@ impl LookupResponseOperation { return Ok(None); } - let requesting_key_hash_list: &ListArray = - downcast_list_array(requesting_key_hash_list.as_ref())?; + let requesting_key_hash_list: &ListArray = requesting_key_hash_list.as_ref().as_list(); let requesting_key_hashes = requesting_key_hash_list.values(); let requesting_key_hashes: &UInt64Array = downcast_primitive_array(requesting_key_hashes.as_ref())?; diff --git a/crates/sparrow-runtime/src/execute/operation/spread.rs b/crates/sparrow-runtime/src/execute/operation/spread.rs index ce6467ef9..979cd74ae 100644 --- a/crates/sparrow-runtime/src/execute/operation/spread.rs +++ b/crates/sparrow-runtime/src/execute/operation/spread.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Context; use arrow::array::{ - new_null_array, Array, ArrayData, ArrayRef, BooleanArray, BooleanBufferBuilder, + new_null_array, Array, ArrayData, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, GenericStringArray, GenericStringBuilder, Int32BufferBuilder, ListArray, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, StructArray, }; @@ -11,8 +11,8 @@ use arrow::datatypes::{self, ArrowPrimitiveType, DataType, Fields}; use bitvec::vec::BitVec; use itertools::{izip, Itertools}; use sparrow_arrow::downcast::{ - downcast_boolean_array, downcast_list_array, downcast_map_array, downcast_primitive_array, - downcast_string_array, downcast_struct_array, + downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array, + downcast_struct_array, }; use sparrow_arrow::utils::make_null_array; use sparrow_instructions::GroupingIndices; @@ -1696,7 +1696,7 @@ impl SpreadImpl for UnlatchedUInt64ListSpread { // referenced exactly once). Instead, we need to spread out the offset // array. - let values = downcast_list_array(values.as_ref())?; + let values = values.as_ref().as_list(); let mut offset_builder = Int32BufferBuilder::new(grouping.len() + 1); let mut null_builder = BooleanBufferBuilder::new(grouping.len()); diff --git a/crates/sparrow-syntax/src/syntax/fenl_type.rs b/crates/sparrow-syntax/src/syntax/fenl_type.rs index de2c70cfa..18af9c4f3 100644 --- a/crates/sparrow-syntax/src/syntax/fenl_type.rs +++ b/crates/sparrow-syntax/src/syntax/fenl_type.rs @@ -82,6 +82,9 @@ impl<'a> std::fmt::Display for FormatDataType<'a> { write!(fmt, "{}", FormatStruct(fields)) } DataType::Date32 => fmt.write_str("date32"), + DataType::List(f) => { + write!(fmt, "list<{}>", FormatDataType(f.data_type())) + } DataType::Map(f, _) => match f.data_type() { DataType::Struct(fields) => { write!( @@ -243,8 +246,6 @@ impl FromStr for FenlType { "duration_ns" => Ok(DataType::Duration(TimeUnit::Nanosecond).into()), "window" => Ok(FenlType::Window), "json" => Ok(FenlType::Json), - // TODO(https://github.com/kaskada-ai/kaskada/issues/494): Support fenl types - // in collections s if s.starts_with("list<") && s.ends_with('>') => { let type_var = &s[5..s.len() - 1] .split(',') diff --git a/proto/kaskada/kaskada/v1alpha/schema.proto b/proto/kaskada/kaskada/v1alpha/schema.proto index c12ebe6bf..faa8cbd41 100644 --- a/proto/kaskada/kaskada/v1alpha/schema.proto +++ b/proto/kaskada/kaskada/v1alpha/schema.proto @@ -32,12 +32,18 @@ message DataType { google.protobuf.Empty window = 3; // A list of a different type. - DataType list = 4; + List list = 4; // A map type. Map map = 5; } + message List { + string name = 1; + DataType item_type = 2; + bool nullable = 3; + } + message Map { string name = 1; bool ordered = 2; diff --git a/testdata/parquet/data_with_list.parquet b/testdata/parquet/data_with_list.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f4b0971f7cac52ead3d26bd94fb557f11fc85c9a GIT binary patch literal 2865 zcmb7GO>A355MDn&yN=@&$FQsSBrB?*dCDQgPMQPV(3|*|+nv z-+VheJ4!~bjB=G{cqYyLoP`);`N_jK%0Kq9uikmLg5Sk2{-ST_`x`BkHv_xx!Iv}G z;2(x5$i3Vf&iZVN=5sI@bGr|cjH~(t^9FTIZC195oC&}9&ILcCYOpZ4TafLIu@iI* zU?5g7%@vaB7_MC7KpX$%96tmE9d?RZF+1=k(Y zwrD?)Z3O$EQU(Y-7VZb z;8r~6=RY`ary%C5OpafzRvJx}rb@H81@s2QJs13pG+j$KXw28GqYD_|1H)WETBs6S zE{&)YflW6O5bg(XCVh~3^;l27R;zXg7a}7v2Aj@W+aVXzd^*j2lncb#sFhUsjLWr#e)hlY8`%E+WLq@rF^(HWeG%E!zDOV8iGyP4wNc!3AZs*LqEn)S+d zshd(reCvXrk%fFb2?_KxuGv=wxD3yU=LRx9?&~N>6)hTa#Q~uiSF8u&scdf)_o*_j zi|fWXCceWx*0E+x?^kpvPV0f<(s_%zaLi^DhGuI(;vKVHQ((52r0xop%~!1Dk?JfTd3n>)9`x{xwY=TSH?8HJ zKAyxSB=2Z0kgL~gt*ebfxww_%XQ2lSLW;4yR1ZVpX>bEa)Y*@HFr5DU*;lc_ks2Rl zTQ#9*&WnKd;!PvKgsRc&;0kV0G!5wWTyaWvn-%b z>S6tc@q;UDKisYk+BAIsql1c{`uTI>;aUC)Gk$irQSBr-E$Wurc&MDx6UDFnT!@dJ zL6C1eZIELkJEm{XSn@-kU!6iKYGLey;#x^p-=eXIcWj8va+=HviTIWFTD{QV{%4HxeR|eUYD{ZXG-!l zRKZuuIO#zIS;~2oKxKbHD!^YNiU@T%E9D%T6D{xTG)TFWc_|Xv$CX^C8K*La zPBbEWkqCj3f!t*rO{wbcmDK7us?!C9-atETN>!s*)}myYLD#v(XkiOtj!rdl*~O@wzXYB!;_K{RZV?|dYshD&hIl)~ zk5cLIi2R{{meWml?sf;aCwcWFJ^ z<_Pyo=cQejX~wCHQzC`@&8;X)N?mSjbpk^rOD1Wk5{d)&My@IJjN9jxGbE#`gz@2s zNR@V=qvblcIJ&;QxL4Sw^Ix`(lCdD?Et&G=+@Gu=k~2*FbrT=84h0mDT0ELt8>MoV p^5R9N={vdlTf4>Pi}l92y<)vlscoMdzc3OViLzZUV;T5o`5&dS9uxoo literal 0 HcmV?d00001