-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][VARIANT] Support reading basic variant #259
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made an initial pass. Overall impression is that we "care too much" about variant in "too many places" -- which complicates the code and makes it error-prone. Not sure the best way to address this shortcoming (differentiating logical vs. physical types might help)?
Two high-level questions I wasn't able to answer during the code review:
- Is there a "clean" boundary between the variant logical type and the physical struct-of-binary type+data? Ideally, that boundary should be enforced very early in the query's lifecycle, so that almost all the kernel code doesn't have to care about it, and simply sees a struct that happens to carry a metadata field?
- What does
VariantCoalesce
really do? I think it just ensures that each variant column has the correct number and ordering of physical sub-fields. But it seems like that should happen automatically (and much more cleanly) if we send the correct read schema to the file readers?
.metadata() | ||
.iter() | ||
.map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) | ||
.collect::<Result<_, serde_json::Error>>() | ||
.collect::<Result<HashMap<String, String>, serde_json::Error>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you can get rid of the turbofish by:
use itertools::Itertools;
...
let mut metadata: HashMap<_, _> = f
.metadata
.iter()
.map(...)
.try_collect()
.map_err(...)?;
ArrowField::new("value", ArrowDataType::Binary, false), | ||
ArrowField::new("metadata", ArrowDataType::Binary, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe those need to be nullable, since a null variant field will be physically represented as a null entry in these arrays?
@@ -89,6 +90,10 @@ impl Scalar { | |||
Decimal128Array::new_null(num_rows) | |||
.with_precision_and_scale(*precision, *scale as i8)?, | |||
), | |||
// TODO(r.chen): Fill this out correctly. | |||
PrimitiveType::Variant => { | |||
panic!("UNSUPPORTED - VARIANT DOESNT HAVE AN ARROW TYPE") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could also just do
PrimitiveType::Variant =>unimplemented!(),
... but wouldn't the following suffice to implement the required functionality?
PrimitiveType::Variant => {
// Arrow has no variant type, so we have to emit a null struct-of-arrays instead.
let arrow_variant = ArrowDataType::try_from(DataType::VARIANT)?;
let physical_variant = DataType::try_from(arrow_variant)?;
Scalar::Null(physical_variant).to_array(num_rows)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, it's a bit troubling that a round trip from kernel to arrow and back is not idempotent?
Do we need to introduce a concept of physical vs. logical data types in kernel, so that variant (logical type) is simply an array-of-struct (physical type) everywhere?
@@ -228,13 +233,131 @@ fn ensure_data_types(kernel_type: &DataType, arrow_type: &ArrowDataType) -> Delt | |||
} | |||
Ok(()) | |||
} | |||
(DataType::Primitive(PrimitiveType::Variant), ArrowDataType::Struct(_)) => Ok(()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to verify the struct's schema? Otherwise weird errors could arise downstream...
Seems like we just need a recursive call to ensure_data_types
, where the first arg is the physical variant schema (see comment on the null to_array
case above), and the second arg is whatever got passed in?
let new_arrays = arrays | ||
.into_iter() | ||
.zip(kernel_struct_type.fields().into_iter()) | ||
.map(|(child_arr, struct_field)| { | ||
variant_coalesce_impl(child_arr, struct_field.data_type()) | ||
}) | ||
.collect::<Result<Vec<_>, _>>()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use itertools::Itertools
, then this code simplifies to:
let new_arrays: Vec<_>= arrays
...
.try_collect()?;
)) | ||
} | ||
} | ||
_ => Ok(Arc::new(arr)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code only succeeds by accident, because Arc
implements the AsRef
trait...
_ => Ok(Arc::new(arr)), | |
_ => Ok(arr), |
variant_coalesce_results.push(coalesced_batch); | ||
} | ||
} | ||
None => (), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look right... if variant is disabled then the entire query result is thrown away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it would be a lot cleaner to have a two pass approach:
- At query compilation time, verify that variant mode is actually enabled if any variant column is found in the schema
- At runtime, unconditionally apply
VariantCoalesce
, since it's anyway a no-op for non-variant columns
Then we don't need all this option and if/else control flow here?
@@ -418,7 +419,8 @@ impl Display for PrimitiveType { | |||
PrimitiveType::TimestampNtz => write!(f, "timestamp_ntz"), | |||
PrimitiveType::Decimal(precision, scale) => { | |||
write!(f, "decimal({},{})", precision, scale) | |||
} | |||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think commas are required after case arms that use curly braces?
Ok::<usize, Error>(idx) | ||
} else { | ||
Err(Error::invalid_variant_representation( | ||
"\"value\" field is not of binary type.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we usually surround column names with '
rather than "
?
(better match with SQL, but also avoids needing escape characters here)
(several more below)
if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() { | ||
let (fields, arrays, nulls) = struct_array.clone().into_parts(); | ||
|
||
let value_idx = if let Some((idx, field)) = fields.find("value") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok to require the physical schema to be in a particular order, rather than checking at runtime like this. In theory, kernel is -- or could be -- in charge of the schema that goes to parquet reader, so it should always match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: If we do fix the field ordering, can the VariantCoalesce
operation (and variant_coalesce_impl
function) just disappear? Or are they accomplishing something else besides field reordering that I didn't notice?
hey @scovich, thanks for taking a look! Sorry for the very late reply, this PR is very much a WIP and was more of a hack for me to familiarize myself with the code base (and rust). But to answer some of your questions:
This makes sense. One thing I've discussed with Nick is a new API where the kernel can receive the parquet schema through the parquet footer. This way, the kernel will see just a struct (with a metadata field). The details of this are still being discussed, though.
yep, currently it doesn't really have a purpose. I have it in this PR just so I can understand the expression framework, but in the future, we'd like to have an expression which will "reconstruct" shredded variants. We intend that this "variant_coalesce" expression will do this in the future - i.e. the engine's scan will return an arbitrary struct, and the "variant_coalesce" expression will process it to return the struct of binaries. |
Still very much WIP.
Implements basic variant support.
To read variants, engines can read physical variants from storage as structs and implement (or use the default) "variant_coalesce" function, which is intended to create a Variant column vector from the struct input.
Because kernel-defaults uses Arrow as its in-memory data representation (and there isn't currently an arrow variant type), variants in kernel-defaults are simply an arrow struct with "value" and "metadata" binary child fields. A piece of metadata "isVariant" is inserted onto the variant struct field to differentiate between a struct and variant.
In the post-shredding future, the "variant_coalesce" function (which is currently more or less a no-op) will be used to rebuild shredded variants into their fully encoded representation.
Tested using an external golden table with variants and nested variants.