Skip to content

Commit

Permalink
assign types to merge inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
bjchambers committed Oct 17, 2023
1 parent 9a86aa4 commit 9683be4
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 5 deletions.
20 changes: 17 additions & 3 deletions crates/sparrow-backend/src/logical_to_physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! For example, if the sequence of expressions exceeded a certain
//! threshold, or contained a UDF, we could introduce a projection.

use arrow_schema::DataType;
use arrow_schema::{DataType, Field};
use egg::ENodeOrVar;
use itertools::Itertools;
use smallvec::smallvec;
Expand Down Expand Up @@ -136,11 +136,21 @@ impl LogicalToPhysical {
// of binary merges by choosing the order to perform the merges based on the number
// of rows.
let inputs: Vec<_> = must_merge.into_iter().collect();
let result_fields: arrow_schema::Fields = inputs
.iter()
.map(|input_step| {
self.step_type(*input_step).map(|data_type| {
Field::new(format!("step_{}", input_step), data_type, true)
})
})
.try_collect()?;
let result_type = DataType::Struct(result_fields);

let merged_step = self.plan.get_or_create_step_id(
sparrow_physical::StepKind::Merge,
inputs.clone(),
ExprVec::empty(),
DataType::Null,
result_type,
);
let exprs = args
.into_iter()
Expand Down Expand Up @@ -343,13 +353,17 @@ impl LogicalToPhysical {
Ok(plan)
}

fn step_type(&self, step_id: sparrow_physical::StepId) -> error_stack::Result<DataType, Error> {
Ok(self.plan.step_result_type(step_id).clone())
}

fn reference_type(&self, reference: &Reference) -> error_stack::Result<DataType, Error> {
match reference.expr.last() {
ENodeOrVar::Var(var) if *var == *crate::exprs::INPUT_VAR => {
let reference_step_id = reference
.step_id
.ok_or_else(|| Error::internal("literal expressions don't have `?input`"))?;
Ok(self.plan.step_result_type(reference_step_id).clone())
self.step_type(reference_step_id)
}
ENodeOrVar::Var(other) => {
error_stack::bail!(Error::internal(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,46 @@ steps:
inputs:
- 0
- 1
result_type: "Null"
result_type:
Struct:
- name: step_0
data_type:
Struct:
- name: x
data_type: Int64
nullable: false
dict_id: 0
dict_is_ordered: false
metadata: {}
- name: y
data_type: Float64
nullable: false
dict_id: 0
dict_is_ordered: false
metadata: {}
nullable: true
dict_id: 0
dict_is_ordered: false
metadata: {}
- name: step_1
data_type:
Struct:
- name: x
data_type: Int64
nullable: false
dict_id: 0
dict_is_ordered: false
metadata: {}
- name: y
data_type: Float64
nullable: false
dict_id: 0
dict_is_ordered: false
metadata: {}
nullable: true
dict_id: 0
dict_is_ordered: false
metadata: {}
exprs: []
- id: 3
kind: project
Expand All @@ -59,7 +98,46 @@ steps:
- name: input
literal_args: []
args: []
result_type: "Null"
result_type:
Struct:
- name: step_0
data_type:
Struct:
- name: x
data_type: Int64
nullable: false
dict_id: 0
dict_is_ordered: false
metadata: {}
- name: y
data_type: Float64
nullable: false
dict_id: 0
dict_is_ordered: false
metadata: {}
nullable: true
dict_id: 0
dict_is_ordered: false
metadata: {}
- name: step_1
data_type:
Struct:
- name: x
data_type: Int64
nullable: false
dict_id: 0
dict_is_ordered: false
metadata: {}
- name: y
data_type: Float64
nullable: false
dict_id: 0
dict_is_ordered: false
metadata: {}
nullable: true
dict_id: 0
dict_is_ordered: false
metadata: {}
- name: fieldref
literal_args:
- Utf8: step_1
Expand Down

0 comments on commit 9683be4

Please sign in to comment.