Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify SchemaTransform API #531

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ impl DataSkippingFilter {

// Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG)
struct NullCountStatsTransform;
impl SchemaTransform for NullCountStatsTransform {
fn transform_primitive<'a>(
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
fn transform_primitive(
&mut self,
_ptype: Cow<'a, PrimitiveType>,
_ptype: &'a PrimitiveType,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit:

Suggested change
_ptype: &'a PrimitiveType,
_: &'a PrimitiveType,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was normal to keep the names of unused params in public method definitions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not sure if there's "normal" on this one. I don't find it adds much here so suggested removing it, but I don't feel strongly either way

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick regexp search over the kernel sources shows seven places that use _: Foo as a function argument and ~100 that use _foo: Foo. I think I'll leave this one as-is.

) -> Option<Cow<'a, PrimitiveType>> {
Some(Cow::Owned(PrimitiveType::Long))
}
}
let nullcount_schema = NullCountStatsTransform
.transform_struct(Cow::Borrowed(&minmax_schema))?
.transform_struct(&minmax_schema)?
.into_owned();
let stats_schema = Arc::new(StructType::new([
StructField::new("numRecords", DataType::LONG, true),
Expand Down
141 changes: 73 additions & 68 deletions kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ impl StructField {
}

struct ApplyNameMapping;
impl SchemaTransform for ApplyNameMapping {
fn transform_struct_field<'a>(
impl<'a> SchemaTransform<'a> for ApplyNameMapping {
fn transform_struct_field(
&mut self,
field: Cow<'a, StructField>,
field: &'a StructField,
) -> Option<Cow<'a, StructField>> {
let field = self.recurse_into_struct_field(field)?;
match field.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) {
Expand All @@ -212,7 +212,7 @@ impl StructField {
}
}

let field = ApplyNameMapping.transform_struct_field(Cow::Borrowed(self));
let field = ApplyNameMapping.transform_struct_field(self);
Ok(field.unwrap().into_owned())
}
}
Expand Down Expand Up @@ -284,7 +284,7 @@ impl StructType {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn leaves<'s>(&self, own_name: impl Into<Option<&'s str>>) -> ColumnNamesAndTypes {
let mut get_leaves = GetSchemaLeaves::new(own_name.into());
let _ = get_leaves.transform_struct(Cow::Borrowed(self));
let _ = get_leaves.transform_struct(self);
(get_leaves.names, get_leaves.types).into()
}
}
Expand Down Expand Up @@ -647,61 +647,73 @@ impl Display for DataType {
/// The provided `recurse_into_xxx` methods encapsulate the boilerplate work of recursing into the
/// child schema elements of each schema element. Implementations can call these as needed but will
/// generally not need to override them.
pub trait SchemaTransform {
pub trait SchemaTransform<'a> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does sharing the lifetime across methods make sense here? put another way: should the lifetimes of the types we are transforming all be the same? I suppose the answer is yes since each method is invokes as we are transforming some type and possibly recursing which would imply the same lifetime requirements?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an observation by @nicklan that allows the data skipping PR to not clone string field names it stores references to -- everything reachable through the entry point must survive at least as long as the entry point, so we can define a single lifetime for the entire traversal that visitors can take advantage of.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay yep makes sense! thank you!

/// Called for each primitive encountered during the schema traversal.
fn transform_primitive<'a>(
&mut self,
ptype: Cow<'a, PrimitiveType>,
) -> Option<Cow<'a, PrimitiveType>> {
Some(ptype)
fn transform_primitive(&mut self, ptype: &'a PrimitiveType) -> Option<Cow<'a, PrimitiveType>> {
Some(Cow::Borrowed(ptype))
}

/// Called for each struct encountered during the schema traversal. Implementations can call
/// [`Self::recurse_into_struct`] if they wish to recursively transform the struct's fields.
fn transform_struct<'a>(&mut self, stype: Cow<'a, StructType>) -> Option<Cow<'a, StructType>> {
fn transform_struct(&mut self, stype: &'a StructType) -> Option<Cow<'a, StructType>> {
self.recurse_into_struct(stype)
}

/// Called for each struct field encountered during the schema traversal. Implementations can
/// call [`Self::recurse_into_struct_field`] if they wish to recursively transform the field's
/// data type.
fn transform_struct_field<'a>(
&mut self,
field: Cow<'a, StructField>,
) -> Option<Cow<'a, StructField>> {
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
self.recurse_into_struct_field(field)
}

/// Called for each array encountered during the schema traversal. Implementations can call
/// [`Self::recurse_into_array`] if they wish to recursively transform the array's element type.
fn transform_array<'a>(&mut self, atype: Cow<'a, ArrayType>) -> Option<Cow<'a, ArrayType>> {
fn transform_array(&mut self, atype: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
self.recurse_into_array(atype)
}

/// Called for each array element encountered during the schema traversal. Implementations can
/// call [`Self::transform`] if they wish to recursively transform the array element type.
fn transform_array_element(&mut self, etype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform(etype)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: why do we have these transform_array_element/map_key/map_val that just wrap transform?

Copy link
Collaborator Author

@scovich scovich Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're missing hook points this PR adds. The nested data skipping and column mapping PR will use them to improve its error messages (because column mapping transformation needs to recurse through maps and arrays, and if something goes wrong inside them, the error message should make it clear where the problem was.

(there's no recurse_into_xxx because no additional boilerplate is needed)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks :)

}

/// Called for each map encountered during the schema traversal. Implementations can call
/// [`Self::recurse_into_map`] if they wish to recursively transform the map's key and/or value
/// types.
fn transform_map<'a>(&mut self, mtype: Cow<'a, MapType>) -> Option<Cow<'a, MapType>> {
fn transform_map(&mut self, mtype: &'a MapType) -> Option<Cow<'a, MapType>> {
self.recurse_into_map(mtype)
}

/// Called for each map key encountered during the schema traversal. Implementations can call
/// [`Self::transform`] if they wish to recursively transform the map key type.
fn transform_map_key(&mut self, etype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform(etype)
}

/// Called for each map value encountered during the schema traversal. Implementations can call
/// [`Self::transform`] if they wish to recursively transform the map value type.
fn transform_map_value(&mut self, etype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform(etype)
}

/// General entry point for a recursive traversal over any data type. Also invoked internally to
/// dispatch on nested data types encountered during the traversal.
fn transform<'a>(&mut self, data_type: Cow<'a, DataType>) -> Option<Cow<'a, DataType>> {
fn transform(&mut self, data_type: &'a DataType) -> Option<Cow<'a, DataType>> {
use Cow::*;
use DataType::*;

// quick boilerplate helper
macro_rules! apply_transform {
( $transform_fn:ident, $arg:ident ) => {
match self.$transform_fn(Borrowed($arg)) {
Some(Borrowed(_)) => Some(data_type),
match self.$transform_fn($arg) {
Some(Borrowed(_)) => Some(Borrowed(data_type)),
Some(Owned(inner)) => Some(Owned(inner.into())),
None => None,
}
};
}
match data_type.as_ref() {
match data_type {
Primitive(ptype) => apply_transform!(transform_primitive, ptype),
Array(atype) => apply_transform!(transform_array, atype),
Struct(stype) => apply_transform!(transform_struct, stype),
Expand All @@ -711,13 +723,13 @@ pub trait SchemaTransform {

/// Recursively transforms a struct field's data type. If the data type changes, update the
/// field to reference it. Otherwise, no-op.
fn recurse_into_struct_field<'a>(
fn recurse_into_struct_field(
&mut self,
field: Cow<'a, StructField>,
field: &'a StructField,
) -> Option<Cow<'a, StructField>> {
use Cow::*;
let field = match self.transform(Borrowed(&field.data_type))? {
Borrowed(_) => field,
let field = match self.transform(&field.data_type)? {
Borrowed(_) => Borrowed(field),
Owned(new_data_type) => Owned(StructField {
name: field.name.clone(),
data_type: new_data_type,
Expand All @@ -730,36 +742,37 @@ pub trait SchemaTransform {

/// Recursively transforms a struct's fields. If one or more fields were changed or removed,
/// update the struct to reference all surviving fields. Otherwise, no-op.
fn recurse_into_struct<'a>(
&mut self,
stype: Cow<'a, StructType>,
) -> Option<Cow<'a, StructType>> {
fn recurse_into_struct(&mut self, stype: &'a StructType) -> Option<Cow<'a, StructType>> {
use Cow::*;
let mut num_borrowed = 0;
let fields: Vec<_> = stype
.fields()
.filter_map(|field| self.transform_struct_field(Borrowed(field)))
.filter_map(|field| self.transform_struct_field(field))
.inspect(|field| {
if let Borrowed(_) = field {
num_borrowed += 1;
}
})
.collect();
let stype = if num_borrowed < stype.fields.len() {

if fields.is_empty() {
None
} else if num_borrowed < stype.fields.len() {
// At least one field was changed or filtered out, so make a new struct
Owned(StructType::new(fields.into_iter().map(|f| f.into_owned())))
Some(Owned(StructType::new(
fields.into_iter().map(|f| f.into_owned()),
)))
} else {
stype
};
Some(stype)
Some(Borrowed(stype))
}
}

/// Recursively transforms an array's element type. If the element type changes, update the
/// array to reference it. Otherwise, no-op.
fn recurse_into_array<'a>(&mut self, atype: Cow<'a, ArrayType>) -> Option<Cow<'a, ArrayType>> {
fn recurse_into_array(&mut self, atype: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
use Cow::*;
let atype = match self.transform(Borrowed(&atype.element_type))? {
Borrowed(_) => atype,
let atype = match self.transform_array_element(&atype.element_type)? {
Borrowed(_) => Borrowed(atype),
Owned(element_type) => Owned(ArrayType {
type_name: atype.type_name.clone(),
element_type,
Expand All @@ -771,12 +784,12 @@ pub trait SchemaTransform {

/// Recursively transforms a map's key and value types. If either one changes, update the map to
/// reference them. If either one is removed, remove the map as well. Otherwise, no-op.
fn recurse_into_map<'a>(&mut self, mtype: Cow<'a, MapType>) -> Option<Cow<'a, MapType>> {
fn recurse_into_map(&mut self, mtype: &'a MapType) -> Option<Cow<'a, MapType>> {
use Cow::*;
let key_type = self.transform(Borrowed(&mtype.key_type))?;
let value_type = self.transform(Borrowed(&mtype.value_type))?;
let key_type = self.transform_map_key(&mtype.key_type)?;
let value_type = self.transform_map_value(&mtype.value_type)?;
let mtype = match (&key_type, &value_type) {
(Borrowed(_), Borrowed(_)) => mtype,
(Borrowed(_), Borrowed(_)) => Borrowed(mtype),
_ => Owned(MapType {
type_name: mtype.type_name.clone(),
key_type: key_type.into_owned(),
Expand All @@ -803,11 +816,8 @@ impl GetSchemaLeaves {
}
}

impl SchemaTransform for GetSchemaLeaves {
fn transform_struct_field<'a>(
&mut self,
field: Cow<'a, StructField>,
) -> Option<Cow<'a, StructField>> {
impl<'a> SchemaTransform<'a> for GetSchemaLeaves {
fn transform_struct_field(&mut self, field: &StructField) -> Option<Cow<'a, StructField>> {
self.path.push(field.name.clone());
if let DataType::Struct(_) = field.data_type {
let _ = self.recurse_into_struct_field(field);
Expand Down Expand Up @@ -845,47 +855,42 @@ impl SchemaDepthChecker {
current_depth: 0,
call_count: 0,
};
checker.transform(Cow::Borrowed(data_type));
checker.transform(data_type);
(checker.max_depth_seen, checker.call_count)
}

// Triggers the requested recursion only doing so would not exceed the depth limit.
fn depth_limited<T: std::fmt::Debug>(
fn depth_limited<'a, T: Clone + std::fmt::Debug>(
&mut self,
recurse: impl FnOnce(&mut Self, T) -> Option<T>,
arg: T,
) -> Option<T> {
recurse: impl FnOnce(&mut Self, &'a T) -> Option<Cow<'a, T>>,
arg: &'a T,
) -> Option<Cow<'a, T>> {
self.call_count += 1;
if self.max_depth_seen < self.current_depth {
self.max_depth_seen = self.current_depth;
if self.depth_limit < self.current_depth {
tracing::warn!("Max schema depth {} exceeded by {arg:?}", self.depth_limit);
}
}
if self.depth_limit < self.max_depth_seen {
return Some(arg); // back out the recursion, we're done
if self.max_depth_seen <= self.depth_limit {
self.current_depth += 1;
let _ = recurse(self, arg);
self.current_depth -= 1;
}

self.current_depth += 1;
let result = recurse(self, arg);
self.current_depth -= 1;
result
None
}
}
impl SchemaTransform for SchemaDepthChecker {
fn transform_struct<'a>(&mut self, stype: Cow<'a, StructType>) -> Option<Cow<'a, StructType>> {
impl<'a> SchemaTransform<'a> for SchemaDepthChecker {
fn transform_struct(&mut self, stype: &'a StructType) -> Option<Cow<'a, StructType>> {
self.depth_limited(Self::recurse_into_struct, stype)
}
fn transform_struct_field<'a>(
&mut self,
field: Cow<'a, StructField>,
) -> Option<Cow<'a, StructField>> {
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
self.depth_limited(Self::recurse_into_struct_field, field)
}
fn transform_array<'a>(&mut self, atype: Cow<'a, ArrayType>) -> Option<Cow<'a, ArrayType>> {
fn transform_array(&mut self, atype: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
self.depth_limited(Self::recurse_into_array, atype)
}
fn transform_map<'a>(&mut self, mtype: Cow<'a, MapType>) -> Option<Cow<'a, MapType>> {
fn transform_map(&mut self, mtype: &'a MapType) -> Option<Cow<'a, MapType>> {
self.depth_limited(Self::recurse_into_map, mtype)
}
}
Expand Down
Loading