Skip to content

Commit

Permalink
[FEAT] (WIP) connect: createDataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Nov 20, 2024
1 parent 81ff3b6 commit 866aa05
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ use eyre::{bail, Context};
use spark_connect::{relation::RelType, Relation};
use tracing::warn;

use crate::translation::logical_plan::{aggregate::aggregate, project::project, range::range};
use crate::translation::logical_plan::{
aggregate::aggregate, project::project, range::range, to_df::to_df,
};

mod aggregate;
mod project;
mod range;
mod to_df;
mod

pub fn to_logical_plan(relation: Relation) -> eyre::Result<LogicalPlanBuilder> {
if let Some(common) = relation.common {
Expand All @@ -24,6 +28,7 @@ pub fn to_logical_plan(relation: Relation) -> eyre::Result<LogicalPlanBuilder> {
RelType::Aggregate(a) => {
aggregate(*a).wrap_err("Failed to apply aggregate to logical plan")
}
RelType::ToDf(t) => to_df(*t).wrap_err("Failed to apply to_df to logical plan"),
plan => bail!("Unsupported relation type: {plan:?}"),
}
}
25 changes: 25 additions & 0 deletions src/daft-connect/src/translation/logical_plan/to_df.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use eyre::{bail, WrapErr};

use crate::translation::to_logical_plan;

pub fn to_df(to_df: spark_connect::ToDf) -> eyre::Result<daft_logical_plan::LogicalPlanBuilder> {
let spark_connect::ToDf {
input,
column_names,
} = to_df;

let Some(input) = input else {
bail!("Input is required");
};

let plan = to_logical_plan(*input)
.wrap_err_with(|| format!("Failed to translate relation to logical plan: {input:?}"))?;

let column_names: Vec<_> = column_names.iter().map(daft_dsl::col).collect();

let plan = plan
.with_columns(column_names)
.wrap_err_with(|| format!("Failed to add columns to logical plan: {column_names:?}"))?;

Ok(plan)
}
12 changes: 12 additions & 0 deletions tests/connect/test_create_df.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from __future__ import annotations


def test_create_df(spark_session):
# Create simple DataFrame
data = [(1,), (2,), (3,)]
df = spark_session.createDataFrame(data, ["id"])

# Convert to pandas
df_pandas = df.toPandas()
assert len(df_pandas) == 3, "DataFrame should have 3 rows"
assert list(df_pandas["id"]) == [1, 2, 3], "DataFrame should contain expected values"
17 changes: 17 additions & 0 deletions tests/connect/test_distinct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from __future__ import annotations

from pyspark.sql.functions import col


def test_distinct(spark_session):
# Create DataFrame with duplicates
data = [(1,), (1,), (2,), (2,), (3,)]
df = spark_session.createDataFrame(data, ["id"])

# Get distinct rows
df_distinct = df.distinct()

# Verify distinct operation removed duplicates
df_distinct_pandas = df_distinct.toPandas()
assert len(df_distinct_pandas) == 3, "Distinct should remove duplicates"
assert set(df_distinct_pandas["id"]) == {1, 2, 3}, "Distinct values should be preserved"

0 comments on commit 866aa05

Please sign in to comment.