Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Field from vortex-expr, replace with FieldName #1915

Merged
merged 10 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use regex::Regex;
use simplelog::*;
use tokio::runtime::Runtime;
use vortex::array::{ChunkedArray, StructArray};
use vortex::dtype::Field;
use vortex::dtype::FieldName;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
Expand Down Expand Up @@ -384,7 +384,7 @@ fn tpc_h_l_comment(c: &mut Criterion) {
.map(|chunk| {
StructArray::try_from(chunk)
.unwrap()
.project(&[Field::from("l_comment")])
.project(&[FieldName::from("l_comment")])
.unwrap()
.into_array()
})
Expand Down
14 changes: 6 additions & 8 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use arrow::datatypes::SchemaRef;
use arrow::pyarrow::{IntoPyArrow, ToPyArrow};
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::{PyLong, PyString};
use pyo3::types::PyString;
use vortex::arrow::infer_schema;
use vortex::dtype::{DType, Field};
use vortex::dtype::{DType, FieldName};
use vortex::error::VortexResult;
use vortex::expr::RowFilter;
use vortex::file::{
Expand Down Expand Up @@ -66,14 +66,12 @@ pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> Vorte
}

fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projection> {
fn field_from_pyany(field: &Bound<PyAny>) -> PyResult<Field> {
fn field_from_pyany(field: &Bound<PyAny>) -> PyResult<FieldName> {
if field.clone().is_instance_of::<PyString>() {
Ok(Field::from(field.downcast::<PyString>()?.to_str()?))
} else if field.is_instance_of::<PyLong>() {
Ok(Field::Index(field.extract()?))
Ok(FieldName::from(field.downcast::<PyString>()?.to_str()?))
} else {
Err(PyTypeError::new_err(format!(
"projection: expected list of string, int, and None, but found: {}.",
"projection: expected list of strings or None, but found: {}.",
field,
)))
}
Expand All @@ -85,7 +83,7 @@ fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projec
columns
.iter()
.map(field_from_pyany)
.collect::<PyResult<Vec<Field>>>()?,
.collect::<PyResult<Vec<FieldName>>>()?,
),
})
}
Expand Down
21 changes: 4 additions & 17 deletions pyvortex/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;

use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::*;
use vortex::dtype::half::f16;
use vortex::dtype::{DType, Field, Nullability, PType};
use vortex::dtype::{DType, Nullability, PType};
use vortex::expr::{col, lit, BinaryExpr, ExprRef, GetItem, Operator};
use vortex::scalar::Scalar;

Expand Down Expand Up @@ -225,8 +223,8 @@ impl PyExpr {
py_binary_opeartor(self_, Operator::Or, coerce_expr(right)?)
}

fn __getitem__(self_: PyRef<'_, Self>, field: PyObject) -> PyResult<PyExpr> {
get_item(self_.py(), field, self_.clone())
fn __getitem__(self_: PyRef<'_, Self>, field: String) -> PyResult<PyExpr> {
get_item(field, self_.clone())
}
}

Expand Down Expand Up @@ -311,18 +309,7 @@ pub fn scalar_helper(dtype: DType, value: &Bound<'_, PyAny>) -> PyResult<Scalar>
}
}

pub fn get_item(py: Python, field: PyObject, child: PyExpr) -> PyResult<PyExpr> {
let field = if let Ok(value) = field.downcast_bound::<PyLong>(py) {
Field::Index(value.extract()?)
} else if let Ok(value) = field.downcast_bound::<PyString>(py) {
Field::Name(Arc::from(value.extract::<String>()?.as_str()))
} else {
return Err(PyValueError::new_err(format!(
"expected int, or str but found: {}",
field
)));
};

pub fn get_item(field: String, child: PyExpr) -> PyResult<PyExpr> {
Ok(PyExpr {
inner: GetItem::new_expr(field, child.inner),
})
Expand Down
13 changes: 10 additions & 3 deletions vortex-array/src/array/chunked/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use vortex_dtype::{DType, Field};
use itertools::Itertools;
use vortex_dtype::{DType, Field, FieldName};
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};

use crate::array::chunked::ChunkedArray;
Expand Down Expand Up @@ -84,7 +85,7 @@ impl StructArrayTrait for ChunkedArray {
Some(chunked)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
let mut chunks = Vec::with_capacity(self.nchunks());
for chunk in self.chunks() {
chunks.push(
Expand All @@ -99,7 +100,13 @@ impl StructArrayTrait for ChunkedArray {
.dtype()
.as_struct()
.ok_or_else(|| vortex_err!("Not a struct dtype"))?
.project(projection)?;
.project(
projection
.iter()
.map(|f| Field::Name(f.clone()))
.collect_vec()
.as_slice(),
)?;
ChunkedArray::try_new(
chunks,
DType::Struct(projected_dtype, self.dtype().nullability()),
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/constant/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_dtype::Field;
use vortex_dtype::FieldName;
use vortex_error::{VortexError, VortexExpect as _, VortexResult};
use vortex_scalar::Scalar;

Expand Down Expand Up @@ -97,7 +97,7 @@ impl StructArrayTrait for ConstantArray {
.map(|scalar| ConstantArray::new(scalar, self.len()).into_array())
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
Ok(
ConstantArray::new(self.scalar().as_struct().project(projection)?, self.len())
.into_array(),
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/sparse/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_dtype::Field;
use vortex_dtype::FieldName;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_scalar::StructScalar;

Expand Down Expand Up @@ -87,7 +87,7 @@ impl StructArrayTrait for SparseArray {
)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
let new_patches = self.patches().map_values(|values| {
values
.as_struct_array()
Expand Down
23 changes: 10 additions & 13 deletions vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,16 @@ impl StructArray {
/// perform column re-ordering, deletion, or duplication at a logical level, without any data
/// copying.
#[allow(clippy::same_name_method)]
pub fn project(&self, projection: &[Field]) -> VortexResult<Self> {
pub fn project(&self, projection: &[FieldName]) -> VortexResult<Self> {
let mut children = Vec::with_capacity(projection.len());
let mut names = Vec::with_capacity(projection.len());

for field in projection.iter() {
let idx = match field {
Field::Name(n) => self
.names()
.iter()
.position(|name| name == n)
.ok_or_else(|| vortex_err!("Unknown field {n}"))?,
Field::Index(i) => *i,
};
for f_name in projection.iter() {
let idx = self
.names()
.iter()
.position(|name| name == f_name)
.ok_or_else(|| vortex_err!("Unknown field {f_name}"))?;

names.push(self.names()[idx].clone());
children.push(
Expand Down Expand Up @@ -170,7 +167,7 @@ impl StructArrayTrait for StructArray {
)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
self.project(projection).map(|a| a.into_array())
}
}
Expand Down Expand Up @@ -223,7 +220,7 @@ impl StatisticsVTable<StructArray> for StructEncoding {
#[cfg(test)]
mod test {
use vortex_buffer::buffer;
use vortex_dtype::{DType, Field, FieldName, FieldNames, Nullability};
use vortex_dtype::{DType, FieldName, FieldNames, Nullability};

use crate::array::primitive::PrimitiveArray;
use crate::array::struct_::StructArray;
Expand Down Expand Up @@ -251,7 +248,7 @@ mod test {
.unwrap();

let struct_b = struct_a
.project(&[Field::from(2usize), Field::from(0)])
.project(&[FieldName::from("zs"), FieldName::from("xs")])
.unwrap();
assert_eq!(
struct_b.names().as_ref(),
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/variants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::sync::Arc;

use vortex_dtype::{DType, ExtDType, Field, FieldInfo, FieldNames, PType};
use vortex_dtype::{DType, ExtDType, Field, FieldInfo, FieldName, FieldNames, PType};
use vortex_error::{vortex_panic, VortexError, VortexExpect as _, VortexResult};

use crate::encoding::Encoding;
Expand Down Expand Up @@ -228,7 +228,7 @@ pub trait StructArrayTrait: ArrayTrait {
}
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData>;
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData>;
}

pub trait ListArrayTrait: ArrayTrait {}
Expand Down
34 changes: 24 additions & 10 deletions vortex-datafusion/src/memory/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::sync::Arc;
use datafusion_common::{Result as DFResult, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use itertools::Itertools;
use vortex_array::array::ChunkedArray;
use vortex_array::{ArrayDType, ArrayLen};
use vortex_dtype::Field;
use vortex_error::VortexResult;
use vortex_dtype::{FieldName, FieldNames};
use vortex_error::{vortex_err, VortexResult};

use crate::memory::statistics::chunked_array_df_stats;
use crate::memory::stream::VortexRecordBatchStream;
Expand All @@ -17,7 +18,7 @@ use crate::memory::stream::VortexRecordBatchStream;
#[derive(Clone)]
pub struct VortexScanExec {
array: ChunkedArray,
scan_projection: Vec<usize>,
scan_projection: FieldNames,
plan_properties: PlanProperties,
statistics: Statistics,
}
Expand All @@ -28,7 +29,25 @@ impl VortexScanExec {
scan_projection: Vec<usize>,
plan_properties: PlanProperties,
) -> VortexResult<Self> {
let statistics = chunked_array_df_stats(&array, &scan_projection)?;
let dtype = array.dtype().as_struct().ok_or_else(|| {
vortex_err!(
"VortexScanExec: expected struct array, found {:?}",
array.dtype()
)
})?;
let scan_projection: FieldNames = scan_projection
.iter()
.map(|idx| {
dtype.names().get(*idx).cloned().ok_or_else(|| {
vortex_err!(
"VortexScanExec: invalid field index {idx} in dtype {:?}",
dtype.names()
)
})
})
.collect::<VortexResult<Vec<FieldName>>>()?
.into();
let statistics = chunked_array_df_stats(&array, scan_projection.clone())?;
Ok(Self {
array,
scan_projection,
Expand Down Expand Up @@ -91,12 +110,7 @@ impl ExecutionPlan for VortexScanExec {
idx: 0,
num_chunks: self.array.nchunks(),
chunks: self.array.clone(),
projection: self
.scan_projection
.iter()
.copied()
.map(Field::from)
.collect(),
projection: self.scan_projection.iter().cloned().collect_vec().into(),
joseph-isaacs marked this conversation as resolved.
Show resolved Hide resolved
}))
}

Expand Down
38 changes: 29 additions & 9 deletions vortex-datafusion/src/memory/plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::{ready, Stream};
use itertools::Itertools;
use pin_project::pin_project;
use vortex_array::array::ChunkedArray;
use vortex_array::arrow::FromArrowArray;
use vortex_array::compute::take;
use vortex_array::{ArrayData, IntoArrayVariant, IntoCanonical};
use vortex_dtype::Field;
use vortex_error::{vortex_err, vortex_panic, VortexError};
use vortex_dtype::{FieldName, FieldNames};
use vortex_error::{vortex_err, vortex_panic, VortexError, VortexExpect};
use vortex_expr::{ExprRef, VortexExprExt};

/// Physical plan operator that applies a set of [filters][Expr] against the input, producing a
Expand Down Expand Up @@ -117,7 +118,12 @@ impl ExecutionPlan for RowSelectorExec {
.into());
}

let filter_projection = self.filter_expr.references().into_iter().cloned().collect();
let filter_projection = self
.filter_expr
.references()
.into_iter()
.collect_vec()
.into();
joseph-isaacs marked this conversation as resolved.
Show resolved Hide resolved
Ok(Box::pin(RowIndicesStream {
chunked_array: self.chunked_array.clone(),
chunk_idx: 0,
Expand All @@ -132,7 +138,7 @@ pub(crate) struct RowIndicesStream {
chunked_array: ChunkedArray,
chunk_idx: usize,
conjunction_expr: ExprRef,
filter_projection: Vec<Field>,
filter_projection: FieldNames,
}

impl Stream for RowIndicesStream {
Expand Down Expand Up @@ -188,7 +194,7 @@ pub(crate) struct TakeRowsExec {
plan_properties: PlanProperties,

// Array storing the indices used to take the plan nodes.
projection: Vec<Field>,
projection: FieldNames,

// Input plan, a stream of indices on which we perform a take against the original dataset.
input: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -216,9 +222,23 @@ impl TakeRowsExec {
Boundedness::Bounded,
);

let names = projection
.iter()
.map(|idx| {
FieldName::from(
output_schema
.fields
.get(*idx)
.vortex_expect("Project index not in schema")
.name()
.clone(),
)
})
.collect_vec();

Self {
plan_properties,
projection: projection.iter().copied().map(Field::from).collect(),
projection: names.into(),
input: row_indices,
output_schema,
table: table.clone(),
Expand Down Expand Up @@ -296,7 +316,7 @@ pub(crate) struct TakeRowsStream<F> {
chunk_idx: usize,

// Projection based on the schema here
output_projection: Vec<Field>,
output_projection: FieldNames,
output_schema: SchemaRef,

// The original Vortex array we're taking from
Expand Down Expand Up @@ -382,7 +402,7 @@ mod test {
use vortex_array::validity::Validity;
use vortex_array::{ArrayDType, IntoArrayData};
use vortex_buffer::buffer;
use vortex_dtype::{Field, FieldName};
use vortex_dtype::FieldName;
use vortex_expr::datafusion::convert_expr_to_vortex;

use crate::memory::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF};
Expand Down Expand Up @@ -419,7 +439,7 @@ mod test {
chunked_array,
chunk_idx: 0,
conjunction_expr: convert_expr_to_vortex(df_expr).unwrap(),
filter_projection: vec![Field::from(0), Field::from(1)],
filter_projection: [FieldName::from("a"), FieldName::from("b")].into(),
};

let rows: Vec<RecordBatch> = futures::executor::block_on_stream(filtering_stream)
Expand Down
Loading
Loading