Skip to content

Commit

Permalink
[CHORE] changes to partition field and field creation (#1537)
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 authored Oct 26, 2023
1 parent ba1eb50 commit 17497a5
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 3 deletions.
2 changes: 2 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ class PyDataType:

class PyField:
def name(self) -> str: ...
@staticmethod
def create(name: str, datatype: PyDataType) -> PyField: ...
def dtype(self) -> PyDataType: ...
def eq(self, other: PyField) -> bool: ...
def __reduce__(self) -> tuple: ...
Expand Down
5 changes: 5 additions & 0 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def _from_pyfield(field: _PyField) -> Field:
f._field = field
return f

@staticmethod
def create(name: str, dtype: DataType) -> Field:
pyfield = _PyField.create(name, dtype._dtype)
return Field._from_pyfield(pyfield)

@property
def name(self):
return self._field.name()
Expand Down
5 changes: 5 additions & 0 deletions src/daft-core/src/python/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ pub struct PyField {

#[pymethods]
impl PyField {
#[staticmethod]
pub fn create(name: &str, data_type: PyDataType) -> PyResult<Self> {
Ok(datatypes::Field::new(name, data_type.dtype).into())
}

pub fn name(&self) -> PyResult<String> {
Ok(self.field.name.clone())
}
Expand Down
4 changes: 2 additions & 2 deletions src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::Display;
use common_error::DaftResult;
use daft_core::schema::SchemaRef;

use crate::{DataFileSource, FileType, ScanOperator, ScanOperatorRef, ScanTask};
use crate::{DataFileSource, FileType, PartitionField, ScanOperator, ScanOperatorRef, ScanTask};
#[derive(Debug)]
pub struct AnonymousScanOperator {
schema: SchemaRef,
Expand Down Expand Up @@ -36,7 +36,7 @@ impl ScanOperator for AnonymousScanOperator {
self.schema.clone()
}

fn partitioning_keys(&self) -> &[daft_core::datatypes::Field] {
fn partitioning_keys(&self) -> &[PartitionField] {
&[]
}

Expand Down
8 changes: 7 additions & 1 deletion src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,16 @@ pub struct ScanTask {
columns: Option<Vec<String>>,
limit: Option<usize>,
}
#[derive(Serialize, Deserialize)]
pub struct PartitionField {
field: Field,
source_field: Option<Field>,
transform: Option<Expr>,
}

pub trait ScanOperator: Send + Display {
fn schema(&self) -> SchemaRef;
fn partitioning_keys(&self) -> &[Field];
fn partitioning_keys(&self) -> &[PartitionField];
fn num_partitions(&self) -> DaftResult<usize>;

// also returns a bool to indicate if the scan operator can "absorb" the predicate
Expand Down

0 comments on commit 17497a5

Please sign in to comment.