Skip to content

Commit

Permalink
feat: add unlatched map spread and largeutf8 spreads (#529)
Browse files Browse the repository at this point in the history
Adds unlatched map spread to the merge operation, which allows us to
have non-stateful `DataType::Map` types in `merge`.

Snuck in support for `LargeUtf8`.
  • Loading branch information
jordanrfrazier authored Jul 25, 2023
1 parent a57eb1b commit 74ed812
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 35 deletions.
61 changes: 47 additions & 14 deletions crates/sparrow-main/tests/e2e/map_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! e2e tests for map types.

use sparrow_api::kaskada::v1alpha::TableConfig;
use uuid::Uuid;

Expand Down Expand Up @@ -108,20 +109,6 @@ async fn test_bool_to_s_get_static_key() {
"###);
}

#[tokio::test]
async fn test_first_last_map() {
// The csv writer does not support map types currently, so the output has been verified
// manually and now just compared as the hash of the parquet output.
let hash =
QueryFixture::new("{ first: Input.s_to_i64 | first(), last: Input.s_to_i64 | last() }")
.run_to_parquet_hash(&map_data_fixture().await)
.await
.unwrap();

let expected = "AB719CF6634779A5285D699A178AC69354696872E3733AA9388C9A6A";
assert_eq!(hash, expected);
}

#[tokio::test]
async fn test_s_to_i64_get_with_first_last_agg() {
// Note that the last_f2 is empty. This is expected because the last() aggregation
Expand All @@ -136,6 +123,18 @@ async fn test_s_to_i64_get_with_first_last_agg() {
"###);
}

#[tokio::test]
async fn test_map_output_into_sum_aggregation() {
insta::assert_snapshot!(QueryFixture::new("{ sum: Input.s_to_i64 | get(\"f1\") | sum(), value: Input.s_to_i64 | get(Input.s_to_i64_key) } | with_key(Input.s_to_i64_key)").run_to_csv(&map_data_fixture().await).await.unwrap(), @r###"
_time,_subsort,_key_hash,_key,sum,value
1996-12-19T16:39:57.000000000,0,18146622110643880433,f1,0,0
1996-12-19T16:40:57.000000000,0,7541589802123724450,f2,1,10
1996-12-19T16:40:59.000000000,0,5533153676183607778,f3,6,
1996-12-19T16:41:57.000000000,0,7541589802123724450,f2,6,13
1996-12-19T16:42:57.000000000,0,5533153676183607778,f3,21,11
"###);
}

#[tokio::test]
#[ignore = "https://docs.rs/arrow-ord/44.0.0/src/arrow_ord/comparison.rs.html#1746"]
async fn test_map_equality() {
Expand All @@ -144,6 +143,40 @@ async fn test_map_equality() {
"###);
}

#[tokio::test]
async fn test_query_with_merge_and_map_output() {
// This query produces a `merge` operations with `map` inputs, verifying
// we support maps within the _unlatched_ `spread` operation as well.
// Note that _latched_ spread is a separate implementation.
//
// It also produces a `map` as an output, verifying we can write maps to parquet.
let hash = QueryFixture::new(
"{ map: Input.s_to_i64, value: Input.s_to_i64 | get(Input.s_to_i64_key), lookup: lookup(Input.s_to_i64_key as u64, Input) }",
)
.run_to_parquet_hash(&map_data_fixture().await)
.await
.unwrap();

assert_eq!(
"92C3C8B7E6AE6AF41266B63F3FBE11958DB5BFD23B58E891963F6287",
hash
);
}

#[tokio::test]
async fn test_first_last_map() {
// The csv writer does not support map types currently, so the output has been verified
// manually and now just compared as the hash of the parquet output.
let hash =
QueryFixture::new("{ first: Input.s_to_i64 | first(), last: Input.s_to_i64 | last() }")
.run_to_parquet_hash(&map_data_fixture().await)
.await
.unwrap();

let expected = "AB719CF6634779A5285D699A178AC69354696872E3733AA9388C9A6A";
assert_eq!(hash, expected);
}

#[tokio::test]
async fn test_swapped_args_for_get_map() {
insta::assert_yaml_snapshot!(QueryFixture::new("{ f1: get(Input.s_to_i64, \"f1\") }")
Expand Down
Loading

0 comments on commit 74ed812

Please sign in to comment.