diff --git a/src/daft-connect/src/translation/logical_plan.rs b/src/daft-connect/src/translation/logical_plan.rs index 93c9e9bd4a..bff60f62cf 100644 --- a/src/daft-connect/src/translation/logical_plan.rs +++ b/src/daft-connect/src/translation/logical_plan.rs @@ -3,9 +3,12 @@ use eyre::{bail, Context}; use spark_connect::{relation::RelType, Limit, Relation}; use tracing::warn; -use crate::translation::logical_plan::{aggregate::aggregate, project::project, range::range}; +use crate::translation::logical_plan::{ + aggregate::aggregate, filter::filter, project::project, range::range, +}; mod aggregate; +mod filter; mod project; mod range; @@ -22,6 +25,7 @@ pub fn to_logical_plan(relation: Relation) -> eyre::Result { RelType::Limit(l) => limit(*l).wrap_err("Failed to apply limit to logical plan"), RelType::Range(r) => range(r).wrap_err("Failed to apply range to logical plan"), RelType::Project(p) => project(*p).wrap_err("Failed to apply project to logical plan"), + RelType::Filter(f) => filter(*f).wrap_err("Failed to apply filter to logical plan"), RelType::Aggregate(a) => { aggregate(*a).wrap_err("Failed to apply aggregate to logical plan") } diff --git a/src/daft-connect/src/translation/logical_plan/filter.rs b/src/daft-connect/src/translation/logical_plan/filter.rs new file mode 100644 index 0000000000..2a5df304f2 --- /dev/null +++ b/src/daft-connect/src/translation/logical_plan/filter.rs @@ -0,0 +1,25 @@ +use eyre::bail; + +use crate::translation::{to_daft_expr, to_logical_plan}; + +pub fn filter( + filter: spark_connect::Filter, +) -> eyre::Result { + let spark_connect::Filter { input, condition } = filter; + + let Some(input) = input else { + bail!("input is required"); + }; + + let Some(condition) = condition else { + bail!("condition is required"); + }; + + let condition = to_daft_expr(&condition)?; + + let plan = to_logical_plan(*input)?; + + let plan = plan.filter(condition)?; + + Ok(plan) +} diff --git a/tests/connect/test_filter.py b/tests/connect/test_filter.py new file mode 100644 index 0000000000..1586c7e7b5 --- /dev/null +++ b/tests/connect/test_filter.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from pyspark.sql.functions import col + + +def test_filter(spark_session): + # Create DataFrame from range(10) + df = spark_session.range(10) + + # Filter for values less than 5 + df_filtered = df.filter(col("id") < 5) + + # Verify the schema is unchanged after filter + assert df_filtered.schema == df.schema, "Schema should be unchanged after filter" + + # Verify the filtered data is correct + df_filtered_pandas = df_filtered.toPandas() + assert len(df_filtered_pandas) == 5, "Should have 5 rows after filtering < 5" + assert all(df_filtered_pandas["id"] < 5), "All values should be less than 5"