Skip to content

Commit

Permalink
test: early filter method with IN operator
Browse files Browse the repository at this point in the history
  • Loading branch information
omkar-foss authored and ion-elgreco committed Aug 30, 2024
1 parent ea4c8b5 commit 35a0608
Showing 1 changed file with 276 additions and 0 deletions.
276 changes: 276 additions & 0 deletions crates/core/src/operations/merge/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,4 +657,280 @@ mod tests {
);
assert_eq!(pred.unwrap(), filter);
}

#[tokio::test]
async fn test_try_construct_early_filter_with_is_in_literals() {
let schema = get_arrow_schema(&None);
let table = setup_table(Some(vec!["modified"])).await;

assert_eq!(table.version(), 0);
assert_eq!(table.get_files_count(), 0);

let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C"])),
Arc::new(arrow::array::Int32Array::from(vec![10, 20, 30])),
Arc::new(arrow::array::StringArray::from(vec![
"2023-07-04",
"2023-07-05",
"2023-07-05"
])),
],
)
.unwrap();
let source_df = ctx.read_batch(batch).unwrap();

let source_name = TableReference::parse_str("source");
let target_name = TableReference::parse_str("target");

let source_plan = LogicalPlanBuilder::scan(
source_name.clone(),
provider_as_source(source_df.into_view()),
None,
)
.unwrap()
.build()
.unwrap();

let join_predicate = col(Column {
relation: Some(source_name.clone()),
name: "id".to_owned(),
})
.eq(col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
}))
.and(
col("modified".to_owned())
.in_list(vec![
lit("2023-07-05"), lit("2023-07-06"), lit("2023-07-07")
], false),
);

let pred = try_construct_early_filter(
join_predicate,
table.snapshot().unwrap(),
&ctx.state(),
&source_plan,
&source_name,
&target_name,
)
.await
.unwrap();

assert!(pred.is_some());

let filter = col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
})
.between(
Expr::Literal(ScalarValue::Utf8(Some("A".to_string()))),
Expr::Literal(ScalarValue::Utf8(Some("C".to_string()))),
)
.and(
col(Column {
relation: None,
name: "modified".to_owned(),
}).in_list(vec![
Expr::Literal(ScalarValue::Utf8(Some("2023-07-05".to_string()))),
Expr::Literal(ScalarValue::Utf8(Some("2023-07-06".to_string()))),
Expr::Literal(ScalarValue::Utf8(Some("2023-07-07".to_string())))
], false),
);
assert_eq!(pred.unwrap(), filter);
}

#[tokio::test]
async fn test_try_construct_early_filter_with_is_in_columns() {
let schema = get_arrow_schema(&None);
let table = setup_table(Some(vec!["modified"])).await;

assert_eq!(table.version(), 0);
assert_eq!(table.get_files_count(), 0);

let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C"])),
Arc::new(arrow::array::Int32Array::from(vec![10, 20, 30])),
Arc::new(arrow::array::StringArray::from(vec![
"2023-07-04",
"2023-07-05",
"2023-07-05"
])),
],
)
.unwrap();
let source_df = ctx.read_batch(batch).unwrap();

let source_name = TableReference::parse_str("source");
let target_name = TableReference::parse_str("target");

let source_plan = LogicalPlanBuilder::scan(
source_name.clone(),
provider_as_source(source_df.into_view()),
None,
)
.unwrap()
.build()
.unwrap();

let join_predicate = col(Column {
relation: Some(source_name.clone()),
name: "id".to_owned(),
})
.eq(col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
}))
.and(
col("modified".to_owned())
.in_list(vec![
col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
}),
col(Column {
relation: Some(target_name.clone()),
name: "modified".to_owned(),
})
], false),
);

let pred = try_construct_early_filter(
join_predicate,
table.snapshot().unwrap(),
&ctx.state(),
&source_plan,
&source_name,
&target_name,
)
.await
.unwrap();

assert!(pred.is_some());

let filter = col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
})
.between(
Expr::Literal(ScalarValue::Utf8(Some("A".to_string()))),
Expr::Literal(ScalarValue::Utf8(Some("C".to_string()))),
)
.and(
col(Column {
relation: None,
name: "modified".to_owned(),
}).in_list(vec![
col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
}),
col(Column {
relation: Some(target_name.clone()),
name: "modified".to_owned(),
})
], false),
);
assert_eq!(pred.unwrap(), filter);
}

#[tokio::test]
async fn test_try_construct_early_filter_with_is_in_ident_and_cols() {
let schema = get_arrow_schema(&None);
let table = setup_table(Some(vec!["modified"])).await;

assert_eq!(table.version(), 0);
assert_eq!(table.get_files_count(), 0);

let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C"])),
Arc::new(arrow::array::Int32Array::from(vec![10, 20, 30])),
Arc::new(arrow::array::StringArray::from(vec![
"2023-07-04",
"2023-07-05",
"2023-07-05"
])),
],
)
.unwrap();
let source_df = ctx.read_batch(batch).unwrap();

let source_name = TableReference::parse_str("source");
let target_name = TableReference::parse_str("target");

let source_plan = LogicalPlanBuilder::scan(
source_name.clone(),
provider_as_source(source_df.into_view()),
None,
)
.unwrap()
.build()
.unwrap();

let join_predicate = col(Column {
relation: Some(source_name.clone()),
name: "id".to_owned(),
})
.eq(col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
}))
.and(
ident("source.id")
.in_list(vec![
col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
}),
col(Column {
relation: Some(target_name.clone()),
name: "modified".to_owned(),
})
], false),
);

let pred = try_construct_early_filter(
join_predicate,
table.snapshot().unwrap(),
&ctx.state(),
&source_plan,
&source_name,
&target_name,
)
.await
.unwrap();

assert!(pred.is_some());

let filter = col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
})
.between(
Expr::Literal(ScalarValue::Utf8(Some("A".to_string()))),
Expr::Literal(ScalarValue::Utf8(Some("C".to_string()))),
)
.and(
ident("source.id").in_list(vec![
col(Column {
relation: Some(target_name.clone()),
name: "id".to_owned(),
}),
col(Column {
relation: Some(target_name.clone()),
name: "modified".to_owned(),
})
], false),
);
assert_eq!(pred.unwrap(), filter);
}
}

0 comments on commit 35a0608

Please sign in to comment.