Skip to content

Commit

Permalink
slight restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Dec 19, 2024
1 parent 0aa67fb commit ce15fe0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 44 deletions.
5 changes: 2 additions & 3 deletions src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use futures::TryStreamExt;
use spark_connect::{relation::RelType, Limit, Relation, ShowString};
use tracing::warn;

use crate::translation::logical_plan::with_columns_renamed::with_columns_renamed;

mod aggregate;
mod drop;
mod filter;
Expand Down Expand Up @@ -113,7 +111,8 @@ impl SparkAnalyzer<'_> {
self.local_relation(plan_id, l)
.wrap_err("Failed to apply local_relation to logical plan")
}
RelType::WithColumnsRenamed(w) => with_columns_renamed(*w)
RelType::WithColumnsRenamed(w) => self
.with_columns_renamed(*w)
.await

Check warning on line 116 in src/daft-connect/src/translation/logical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan.rs#L116

Added line #L116 was not covered by tests
.wrap_err("Failed to apply with_columns_renamed to logical plan"),
RelType::Read(r) => read::read(r)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,48 @@
use daft_dsl::col;
use daft_logical_plan::LogicalPlanBuilder;
use eyre::{bail, Context};

use crate::translation::Plan;

pub async fn with_columns_renamed(
with_columns_renamed: spark_connect::WithColumnsRenamed,
) -> eyre::Result<Plan> {
let spark_connect::WithColumnsRenamed {
input,
rename_columns_map,
renames,
} = with_columns_renamed;

let Some(input) = input else {
bail!("Input is required");
};

let mut plan = Box::pin(crate::translation::to_logical_plan(*input)).await?;

// todo: let's implement this directly into daft

// Convert the rename mappings into expressions
let rename_exprs = if !rename_columns_map.is_empty() {
// Use rename_columns_map if provided (legacy format)
rename_columns_map
.into_iter()
.map(|(old_name, new_name)| col(old_name.as_str()).alias(new_name.as_str()))
.collect()
} else {
// Use renames if provided (new format)
renames
.into_iter()
.map(|rename| col(rename.col_name.as_str()).alias(rename.new_col_name.as_str()))
.collect()
};

// Apply the rename expressions to the plan
plan.builder = plan
.builder
.select(rename_exprs)
.wrap_err("Failed to apply rename expressions to logical plan")?;

Ok(plan)
use crate::translation::SparkAnalyzer;

impl SparkAnalyzer<'_> {
pub async fn with_columns_renamed(
&self,
with_columns_renamed: spark_connect::WithColumnsRenamed,
) -> eyre::Result<LogicalPlanBuilder> {
let spark_connect::WithColumnsRenamed {
input,
rename_columns_map,
renames,
} = with_columns_renamed;

let Some(input) = input else {
bail!("Input is required");

Check warning on line 19 in src/daft-connect/src/translation/logical_plan/with_columns_renamed.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan/with_columns_renamed.rs#L19

Added line #L19 was not covered by tests
};

let plan = Box::pin(self.to_logical_plan(*input)).await?;

// todo: let's implement this directly into daft

// Convert the rename mappings into expressions
let rename_exprs = if !rename_columns_map.is_empty() {
// Use rename_columns_map if provided (legacy format)
rename_columns_map
.into_iter()
.map(|(old_name, new_name)| col(old_name.as_str()).alias(new_name.as_str()))
.collect()
} else {
// Use renames if provided (new format)
renames
.into_iter()
.map(|rename| col(rename.col_name.as_str()).alias(rename.new_col_name.as_str()))
.collect()

Check warning on line 38 in src/daft-connect/src/translation/logical_plan/with_columns_renamed.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan/with_columns_renamed.rs#L35-L38

Added lines #L35 - L38 were not covered by tests
};

// Apply the rename expressions to the plan
let plan = plan
.select(rename_exprs)
.wrap_err("Failed to apply rename expressions to logical plan")?;

Ok(plan)
}
}

0 comments on commit ce15fe0

Please sign in to comment.