Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Field from vortex-expr, replace with FieldName #1915

Merged
merged 10 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use regex::Regex;
use simplelog::*;
use tokio::runtime::Runtime;
use vortex::array::{ChunkedArray, StructArray};
use vortex::dtype::Field;
use vortex::dtype::FieldName;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
Expand Down Expand Up @@ -384,7 +384,7 @@ fn tpc_h_l_comment(c: &mut Criterion) {
.map(|chunk| {
StructArray::try_from(chunk)
.unwrap()
.project(&[Field::from("l_comment")])
.project(&[FieldName::from("l_comment")])
.unwrap()
.into_array()
})
Expand Down
14 changes: 6 additions & 8 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use arrow::datatypes::SchemaRef;
use arrow::pyarrow::{IntoPyArrow, ToPyArrow};
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::{PyLong, PyString};
use pyo3::types::PyString;
use vortex::arrow::infer_schema;
use vortex::dtype::{DType, Field};
use vortex::dtype::{DType, FieldName};
use vortex::error::VortexResult;
use vortex::expr::RowFilter;
use vortex::file::{
Expand Down Expand Up @@ -66,14 +66,12 @@ pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> Vorte
}

fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projection> {
fn field_from_pyany(field: &Bound<PyAny>) -> PyResult<Field> {
fn field_from_pyany(field: &Bound<PyAny>) -> PyResult<FieldName> {
if field.clone().is_instance_of::<PyString>() {
Ok(Field::from(field.downcast::<PyString>()?.to_str()?))
} else if field.is_instance_of::<PyLong>() {
Ok(Field::Index(field.extract()?))
Ok(FieldName::from(field.downcast::<PyString>()?.to_str()?))
} else {
Err(PyTypeError::new_err(format!(
"projection: expected list of string, int, and None, but found: {}.",
"projection: expected list of strings or None, but found: {}.",
field,
)))
}
Expand All @@ -85,7 +83,7 @@ fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projec
columns
.iter()
.map(field_from_pyany)
.collect::<PyResult<Vec<Field>>>()?,
.collect::<PyResult<Vec<FieldName>>>()?,
),
})
}
Expand Down
21 changes: 4 additions & 17 deletions pyvortex/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;

use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::*;
use vortex::dtype::half::f16;
use vortex::dtype::{DType, Field, Nullability, PType};
use vortex::dtype::{DType, Nullability, PType};
use vortex::expr::{col, lit, BinaryExpr, ExprRef, GetItem, Operator};
use vortex::scalar::Scalar;

Expand Down Expand Up @@ -225,8 +223,8 @@ impl PyExpr {
py_binary_opeartor(self_, Operator::Or, coerce_expr(right)?)
}

fn __getitem__(self_: PyRef<'_, Self>, field: PyObject) -> PyResult<PyExpr> {
get_item(self_.py(), field, self_.clone())
fn __getitem__(self_: PyRef<'_, Self>, field: String) -> PyResult<PyExpr> {
get_item(field, self_.clone())
}
}

Expand Down Expand Up @@ -311,18 +309,7 @@ pub fn scalar_helper(dtype: DType, value: &Bound<'_, PyAny>) -> PyResult<Scalar>
}
}

pub fn get_item(py: Python, field: PyObject, child: PyExpr) -> PyResult<PyExpr> {
let field = if let Ok(value) = field.downcast_bound::<PyLong>(py) {
Field::Index(value.extract()?)
} else if let Ok(value) = field.downcast_bound::<PyString>(py) {
Field::Name(Arc::from(value.extract::<String>()?.as_str()))
} else {
return Err(PyValueError::new_err(format!(
"expected int, or str but found: {}",
field
)));
};

pub fn get_item(field: String, child: PyExpr) -> PyResult<PyExpr> {
Ok(PyExpr {
inner: GetItem::new_expr(field, child.inner),
})
Expand Down
15 changes: 0 additions & 15 deletions pyvortex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,6 @@ use crate::{PyArray, TOKIO_RUNTIME};
/// null
/// ]
///
/// Read just the name column, by its index:
///
/// >>> d = vortex.io.read_path("a.vortex", projection = [1])
/// >>> d.to_arrow_array()
/// <pyarrow.lib.StructArray object at ...>
/// -- is_valid: all not null
/// -- child 0 type: string_view
/// [
/// "Joseph",
/// null,
/// "Angela",
/// "Mikhail",
/// null
/// ]
///
///
/// Keep rows with an age above 35. This will read O(N_KEPT) rows, when the file format allows.
///
Expand Down
13 changes: 10 additions & 3 deletions vortex-array/src/array/chunked/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use vortex_dtype::{DType, Field};
use itertools::Itertools;
use vortex_dtype::{DType, Field, FieldName};
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};

use crate::array::chunked::ChunkedArray;
Expand Down Expand Up @@ -84,7 +85,7 @@ impl StructArrayTrait for ChunkedArray {
Some(chunked)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
let mut chunks = Vec::with_capacity(self.nchunks());
for chunk in self.chunks() {
chunks.push(
Expand All @@ -99,7 +100,13 @@ impl StructArrayTrait for ChunkedArray {
.dtype()
.as_struct()
.ok_or_else(|| vortex_err!("Not a struct dtype"))?
.project(projection)?;
.project(
projection
.iter()
.map(|f| Field::Name(f.clone()))
.collect_vec()
.as_slice(),
)?;
ChunkedArray::try_new(
chunks,
DType::Struct(projected_dtype, self.dtype().nullability()),
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/constant/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_dtype::Field;
use vortex_dtype::FieldName;
use vortex_error::{VortexError, VortexExpect as _, VortexResult};
use vortex_scalar::Scalar;

Expand Down Expand Up @@ -97,7 +97,7 @@ impl StructArrayTrait for ConstantArray {
.map(|scalar| ConstantArray::new(scalar, self.len()).into_array())
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
Ok(
ConstantArray::new(self.scalar().as_struct().project(projection)?, self.len())
.into_array(),
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/sparse/variants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_dtype::Field;
use vortex_dtype::FieldName;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_scalar::StructScalar;

Expand Down Expand Up @@ -87,7 +87,7 @@ impl StructArrayTrait for SparseArray {
)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
let new_patches = self.patches().map_values(|values| {
values
.as_struct_array()
Expand Down
23 changes: 10 additions & 13 deletions vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,16 @@ impl StructArray {
/// perform column re-ordering, deletion, or duplication at a logical level, without any data
/// copying.
#[allow(clippy::same_name_method)]
pub fn project(&self, projection: &[Field]) -> VortexResult<Self> {
pub fn project(&self, projection: &[FieldName]) -> VortexResult<Self> {
let mut children = Vec::with_capacity(projection.len());
let mut names = Vec::with_capacity(projection.len());

for field in projection.iter() {
let idx = match field {
Field::Name(n) => self
.names()
.iter()
.position(|name| name == n)
.ok_or_else(|| vortex_err!("Unknown field {n}"))?,
Field::Index(i) => *i,
};
for f_name in projection.iter() {
let idx = self
.names()
.iter()
.position(|name| name == f_name)
.ok_or_else(|| vortex_err!("Unknown field {f_name}"))?;

names.push(self.names()[idx].clone());
children.push(
Expand Down Expand Up @@ -170,7 +167,7 @@ impl StructArrayTrait for StructArray {
)
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData> {
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData> {
self.project(projection).map(|a| a.into_array())
}
}
Expand Down Expand Up @@ -223,7 +220,7 @@ impl StatisticsVTable<StructArray> for StructEncoding {
#[cfg(test)]
mod test {
use vortex_buffer::buffer;
use vortex_dtype::{DType, Field, FieldName, FieldNames, Nullability};
use vortex_dtype::{DType, FieldName, FieldNames, Nullability};

use crate::array::primitive::PrimitiveArray;
use crate::array::struct_::StructArray;
Expand Down Expand Up @@ -251,7 +248,7 @@ mod test {
.unwrap();

let struct_b = struct_a
.project(&[Field::from(2usize), Field::from(0)])
.project(&[FieldName::from("zs"), FieldName::from("xs")])
.unwrap();
assert_eq!(
struct_b.names().as_ref(),
Expand Down
8 changes: 1 addition & 7 deletions vortex-array/src/arrow/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use arrow_array::{BinaryViewArray, GenericByteViewArray, GenericListArray, Strin
use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, ScalarBuffer};
use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
use itertools::Itertools;
use vortex_buffer::{Alignment, ByteBuffer};
use vortex_datetime_dtype::TimeUnit;
use vortex_dtype::{DType, NativePType, Nullability, PType};
Expand Down Expand Up @@ -166,12 +165,7 @@ impl FromArrowArray<&ArrowBooleanArray> for ArrayData {
impl FromArrowArray<&ArrowStructArray> for ArrayData {
fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
StructArray::try_new(
value
.column_names()
.iter()
.map(|s| (*s).into())
.collect_vec()
.into(),
value.column_names().iter().map(|s| (*s).into()).collect(),
value
.columns()
.iter()
Expand Down
8 changes: 2 additions & 6 deletions vortex-array/src/arrow/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ impl FromArrowType<SchemaRef> for DType {
.fields()
.iter()
.map(|f| f.name().as_str().into())
.collect_vec()
.into(),
.collect(),
value
.fields()
.iter()
Expand Down Expand Up @@ -91,10 +90,7 @@ impl FromArrowType<&Field> for DType {
}
DataType::Struct(f) => Struct(
StructDType::new(
f.iter()
.map(|f| f.name().as_str().into())
.collect_vec()
.into(),
f.iter().map(|f| f.name().as_str().into()).collect(),
f.iter().map(|f| Self::from_arrow(f.as_ref())).collect_vec(),
),
nullability,
Expand Down
4 changes: 1 addition & 3 deletions vortex-array/src/arrow/record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use arrow_array::cast::AsArray;
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Schema};
use itertools::Itertools;
use vortex_error::{vortex_err, VortexError, VortexResult};

use crate::array::StructArray;
Expand All @@ -19,8 +18,7 @@ impl TryFrom<RecordBatch> for ArrayData {
.fields()
.iter()
.map(|f| f.name().as_str().into())
.collect_vec()
.into(),
.collect(),
value
.columns()
.iter()
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/variants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::sync::Arc;

use vortex_dtype::{DType, ExtDType, Field, FieldInfo, FieldNames, PType};
use vortex_dtype::{DType, ExtDType, Field, FieldInfo, FieldName, FieldNames, PType};
use vortex_error::{vortex_panic, VortexError, VortexExpect as _, VortexResult};

use crate::encoding::Encoding;
Expand Down Expand Up @@ -228,7 +228,7 @@ pub trait StructArrayTrait: ArrayTrait {
}
}

fn project(&self, projection: &[Field]) -> VortexResult<ArrayData>;
fn project(&self, projection: &[FieldName]) -> VortexResult<ArrayData>;
}

pub trait ListArrayTrait: ArrayTrait {}
Expand Down
33 changes: 23 additions & 10 deletions vortex-datafusion/src/memory/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use vortex_array::array::ChunkedArray;
use vortex_array::{ArrayDType, ArrayLen};
use vortex_dtype::Field;
use vortex_error::VortexResult;
use vortex_dtype::{FieldName, FieldNames};
use vortex_error::{vortex_err, VortexResult};

use crate::memory::statistics::chunked_array_df_stats;
use crate::memory::stream::VortexRecordBatchStream;
Expand All @@ -17,7 +17,7 @@ use crate::memory::stream::VortexRecordBatchStream;
#[derive(Clone)]
pub struct VortexScanExec {
array: ChunkedArray,
scan_projection: Vec<usize>,
scan_projection: FieldNames,
plan_properties: PlanProperties,
statistics: Statistics,
}
Expand All @@ -28,7 +28,25 @@ impl VortexScanExec {
scan_projection: Vec<usize>,
plan_properties: PlanProperties,
) -> VortexResult<Self> {
let statistics = chunked_array_df_stats(&array, &scan_projection)?;
let dtype = array.dtype().as_struct().ok_or_else(|| {
vortex_err!(
"VortexScanExec: expected struct array, found {:?}",
array.dtype()
)
})?;
let scan_projection: FieldNames = scan_projection
.iter()
.map(|idx| {
dtype.names().get(*idx).cloned().ok_or_else(|| {
vortex_err!(
"VortexScanExec: invalid field index {idx} in dtype {:?}",
dtype.names()
)
})
})
.collect::<VortexResult<Vec<FieldName>>>()?
.into();
let statistics = chunked_array_df_stats(&array, scan_projection.clone())?;
Ok(Self {
array,
scan_projection,
Expand Down Expand Up @@ -91,12 +109,7 @@ impl ExecutionPlan for VortexScanExec {
idx: 0,
num_chunks: self.array.nchunks(),
chunks: self.array.clone(),
projection: self
.scan_projection
.iter()
.copied()
.map(Field::from)
.collect(),
projection: self.scan_projection.iter().cloned().collect(),
}))
}

Expand Down
Loading
Loading