Skip to content

Commit

Permalink
adding tests, removing integration test for now
Browse files Browse the repository at this point in the history
  • Loading branch information
Alon Agmon committed Aug 28, 2024
1 parent e8bd953 commit 9a54ef9
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 45 deletions.
17 changes: 8 additions & 9 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,9 @@ fn convert_filters_to_predicate(filters: &[Expr]) -> Option<Predicate> {
.reduce(Predicate::and)
}

/// Converts a DataFusion [`Expr`] to an Iceberg [`Predicate`].
/// Recuresivly converting DataFusion filters ( in a [`Expr`]) to an Iceberg [`Predicate`].
///
/// This function handles the conversion of certain DataFusion expression types
/// to their corresponding Iceberg predicates. It supports the following cases:
/// This function currently handles the conversion of DataFusion expression of the following types:
///
/// 1. Simple binary expressions (e.g., "column < value")
/// 2. Compound AND expressions (e.g., "x < 1 AND y > 10")
Expand All @@ -188,13 +187,13 @@ fn expr_to_predicate(expr: &Expr) -> Option<Predicate> {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
match (left.as_ref(), op, right.as_ref()) {
// first option: x < 1
// First option arm (simple case), e.g. x < 1
(Expr::Column(col), op, Expr::Literal(lit)) => {
let reference = Reference::new(col.name.clone());
let datum = scalar_value_to_datum(lit)?;
Some(binary_op_to_predicate(reference, op, datum))
}
// second option (inner AND): x < 1 AND y > 10
// Second option arm (inner AND), e.g. x < 1 AND y > 10
// if its an AND expression and one predicate fails, we can still go with the other one
(left_expr, Operator::And, right_expr) => {
let left_pred = expr_to_predicate(&left_expr.clone());
Expand All @@ -206,8 +205,8 @@ fn expr_to_predicate(expr: &Expr) -> Option<Predicate> {
(None, None) => None,
}
}
// third option (inner OR): x < 1 OR y > 10
// if one is unsuported, we need to fail the predicate
// Third option arm (inner OR), e.g. x < 1 OR y > 10
// if one is unsuported, we fail the predicate

Check warning on line 209 in crates/integrations/datafusion/src/physical_plan/scan.rs

View workflow job for this annotation

GitHub Actions / typos check

"unsuported" should be "unsupported".
(Expr::BinaryExpr(left_expr), Operator::Or, Expr::BinaryExpr(right_expr)) => {
let left_pred = expr_to_predicate(&Expr::BinaryExpr(left_expr.clone()))?;
let right_pred = expr_to_predicate(&Expr::BinaryExpr(right_expr.clone()))?;
Expand Down Expand Up @@ -292,11 +291,11 @@ mod tests {
.parse_sql_expr(sql, &df_schema)
.unwrap();
let predicate = convert_filters_to_predicate(&[expr]).unwrap();
let inner_predicate = Predicate::and(
let expected_predicate = Predicate::and(
Reference::new("foo").greater_than(Datum::long(1)),
Reference::new("bar").equal_to(Datum::string("test")),
);
assert_eq!(predicate, inner_predicate);
assert_eq!(predicate, expected_predicate);
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,39 +147,3 @@ async fn test_provider_list_schema_names() -> Result<()> {
.all(|item| result.contains(&item.to_string())));
Ok(())
}
#[tokio::test]
async fn test_table_scan() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let namespace = NamespaceIdent::new("test_provider_list_table_names".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;
let creation = set_table_creation(temp_path(), "my_table")?;
let new_table = iceberg_catalog.create_table(&namespace, creation).await?;
let client = Arc::new(iceberg_catalog);
let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
let ctx = SessionContext::new();

ctx.register_catalog("catalog", catalog);
let df = ctx
.sql("select * from catalog.test_provider_list_table_names.my_table where (foo > 1 and bar = 'test') or foo < 0 ")
.await
.unwrap();

let compute_result = df.collect().await;
if let Ok(df) = compute_result {
println!("==> compute_result OK: {:?}", df);
} else {
println!(
"==> compute_result ERROR: {:?}",
compute_result.err().unwrap()
);
}
let provider = ctx.catalog("catalog").unwrap();
let schema = provider.schema("test_provider_list_table_names").unwrap();

let expected = vec!["my_table"];
let result = schema.table_names();

assert_eq!(result, expected);

Ok(())
}

0 comments on commit 9a54ef9

Please sign in to comment.