Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][VARIANT] Support reading basic variant #259

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions kernel/src/engine/arrow_conversion.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Conversions from kernel types to arrow types

use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::{
ArrowError, DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef, TimeUnit,
ArrowError, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit,
};
use itertools::Itertools;

Expand All @@ -26,13 +27,17 @@ impl TryFrom<&StructField> for ArrowField {
type Error = ArrowError;

fn try_from(f: &StructField) -> Result<Self, ArrowError> {
let metadata = f
let mut metadata = f
.metadata()
.iter()
.map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?)))
.collect::<Result<_, serde_json::Error>>()
.collect::<Result<HashMap<String, String>, serde_json::Error>>()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can get rid of the turbofish by:

use itertools::Itertools;
  ...
let mut metadata: HashMap<_, _> = f
    .metadata
    .iter()
    .map(...)
    .try_collect()
    .map_err(...)?;

.map_err(|err| ArrowError::JsonError(err.to_string()))?;

if f.data_type == DataType::VARIANT {
metadata.insert("isVariant".to_string(), "true".to_string());
}

let field = ArrowField::new(
f.name(),
ArrowDataType::try_from(f.data_type())?,
Expand Down Expand Up @@ -111,7 +116,16 @@ impl TryFrom<&DataType> for ArrowDataType {
)),
PrimitiveType::TimestampNtz => {
Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
}
},
// The Delta spec does not enforce the field indexes of the "value" and
// "Metadata" fields. However, within kernel defaults, the "variant_coalesce"
// expression will always return variants with this physical representation.
PrimitiveType::Variant => Ok(ArrowDataType::Struct(ArrowFields::from(
vec![
ArrowField::new("value", ArrowDataType::Binary, false),
ArrowField::new("metadata", ArrowDataType::Binary, false),
Comment on lines +125 to +126
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe those need to be nullable, since a null variant field will be physically represented as a null entry in these arrays?

]
)))
}
}
DataType::Struct(s) => Ok(ArrowDataType::Struct(
Expand Down
131 changes: 130 additions & 1 deletion kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Expression handling based on arrow-rs compute kernels.
use std::primitive;
use std::sync::Arc;

use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene};
use arrow_arith::numeric::{add, div, mul, sub};
use arrow_array::cast::AsArray;
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Datum, Decimal128Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, RecordBatch,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, MapArray, RecordBatch,
StringArray, StructArray, TimestampMicrosecondArray,
};
use arrow_ord::cmp::{distinct, eq, gt, gt_eq, lt, lt_eq, neq};
Expand Down Expand Up @@ -89,6 +90,10 @@ impl Scalar {
Decimal128Array::new_null(num_rows)
.with_precision_and_scale(*precision, *scale as i8)?,
),
// TODO(r.chen): Fill this out correctly.
PrimitiveType::Variant => {
panic!("UNSUPPORTED - VARIANT DOESNT HAVE AN ARROW TYPE")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could also just do

PrimitiveType::Variant =>unimplemented!(),

... but wouldn't the following suffice to implement the required functionality?

PrimitiveType::Variant => {
    // Arrow has no variant type, so we have to emit a null struct-of-arrays instead.
    let arrow_variant = ArrowDataType::try_from(DataType::VARIANT)?;
    let physical_variant = DataType::try_from(arrow_variant)?;
    Scalar::Null(physical_variant).to_array(num_rows)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, it's a bit troubling that a round trip from kernel to arrow and back is not idempotent?
Do we need to introduce a concept of physical vs. logical data types in kernel, so that variant (logical type) is simply an array-of-struct (physical type) everywhere?

}
},
DataType::Struct(t) => {
let fields: Fields = t.fields().map(ArrowField::try_from).try_collect()?;
Expand Down Expand Up @@ -228,13 +233,131 @@ fn ensure_data_types(kernel_type: &DataType, arrow_type: &ArrowDataType) -> Delt
}
Ok(())
}
(DataType::Primitive(PrimitiveType::Variant), ArrowDataType::Struct(_)) => Ok(()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to verify the struct's schema? Otherwise weird errors could arise downstream...

Seems like we just need a recursive call to ensure_data_types, where the first arg is the physical variant schema (see comment on the null to_array case above), and the second arg is whatever got passed in?

_ => Err(make_arrow_error(format!(
"Incorrect datatype. Expected {}, got {}",
kernel_type, arrow_type
))),
}
}

fn variant_coalesce_impl(arr: Arc<dyn Array>, kernel_dt: &DataType) -> DeltaResult<ArrayRef> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't ArrayRef just Arc<dyn Array>?

Suggested change
fn variant_coalesce_impl(arr: Arc<dyn Array>, kernel_dt: &DataType) -> DeltaResult<ArrayRef> {
fn variant_coalesce_impl(arr: ArrayRef, kernel_dt: &DataType) -> DeltaResult<ArrayRef> {

// TODO(r.chen): Figure out how bad these clone() calls are.
match kernel_dt {
DataType::Struct(kernel_struct_type) => {
if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
let (fields, arrays, nulls) = struct_array.clone().into_parts();
let new_arrays = arrays
.into_iter()
.zip(kernel_struct_type.fields().into_iter())
.map(|(child_arr, struct_field)| {
variant_coalesce_impl(child_arr, struct_field.data_type())
})
.collect::<Result<Vec<_>, _>>()?;
Comment on lines +250 to +256
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use itertools::Itertools, then this code simplifies to:

                let new_arrays: Vec<_>= arrays
                      ...
                    .try_collect()?;

Ok(Arc::new(StructArray::new(
// TODO(r.chen): Should we be setting this with the new isVariant metadata?
// or will that be handled elsewhere?
fields, new_arrays, nulls,
)))
} else {
Err(Error::unexpected_column_type(kernel_dt))
}
}
DataType::Array(kernel_array_type) => {
if let Some(list_array) = arr.as_any().downcast_ref::<ListArray>() {
let (field, offsets, values, nulls) = list_array.clone().into_parts();
let new_values =
variant_coalesce_impl(values.clone(), kernel_array_type.element_type())?;
Ok(Arc::<ListArray>::new(ListArray::new(
// TODO(r.chen): Should we be setting this with the new isVariant metadata?
// or will that be handled elsewhere?
field, offsets, new_values, nulls,
)))
} else {
Err(Error::unexpected_column_type(kernel_dt))
}
}
DataType::Map(kernel_map_type) => {
if let Some(map_array) = arr.as_any().downcast_ref::<MapArray>() {
let (field, offsets, entries, nulls, ordered) = map_array.clone().into_parts();
let new_entries = StructArray::new(
entries.fields().clone(),
vec![
entries.column(0).clone(),
// Map keys cannot be of variant type.
variant_coalesce_impl(
entries.column(1).clone(),
kernel_map_type.value_type(),
)?,
],
entries.nulls().cloned(),
);
Ok(Arc::<MapArray>::new(MapArray::new(
// TODO(r.chen): Should we be setting this with the new isVariant metadata?
// or will that be handled elsewhere?
field,
offsets,
new_entries,
nulls,
ordered,
)))
} else {
Err(Error::unexpected_column_type(kernel_dt))
}
}
DataType::Primitive(primitive_type) => match primitive_type {
PrimitiveType::Variant => {
if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
let (fields, arrays, nulls) = struct_array.clone().into_parts();

let value_idx = if let Some((idx, field)) = fields.find("value") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to require the physical schema to be in a particular order, rather than checking at runtime like this. In theory, kernel is -- or could be -- in charge of the schema that goes to parquet reader, so it should always match?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: If we do fix the field ordering, can the VariantCoalesce operation (and variant_coalesce_impl function) just disappear? Or are they accomplishing something else besides field reordering that I didn't notice?

if field.data_type() == &ArrowDataType::Binary {
Ok::<usize, Error>(idx)
} else {
Err(Error::invalid_variant_representation(
"\"value\" field is not of binary type.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we usually surround column names with ' rather than "?
(better match with SQL, but also avoids needing escape characters here)

(several more below)

))
}
} else {
Err(Error::invalid_variant_representation(
"\"value\" field is not found.",
))
}?;

let metadata_idx = if let Some((idx, field)) = fields.find("metadata") {
if field.data_type() == &ArrowDataType::Binary {
Ok::<usize, Error>(idx)
} else {
Err(Error::invalid_variant_representation(
"\"metadata\" field is not of binary type.",
))
}
} else {
Err(Error::invalid_variant_representation(
"\"metadata\" field is not found.",
))
}?;

let new_fields = Fields::from(vec![
fields[value_idx].clone(),
fields[metadata_idx].clone(),
]);
let new_arrays = vec![
Arc::clone(&arrays[value_idx]),
Arc::clone(&arrays[metadata_idx]),
];
Ok(Arc::new(StructArray::new(new_fields, new_arrays, nulls)))
} else {
Err(Error::invalid_variant_representation(
"variants should be represented in storage as structs.",
))
}
}
_ => Ok(Arc::new(arr)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code only succeeds by accident, because Arc implements the AsRef trait...

Suggested change
_ => Ok(Arc::new(arr)),
_ => Ok(arr),

},
}
}

fn evaluate_expression(
expression: &Expression,
batch: &RecordBatch,
Expand Down Expand Up @@ -288,6 +411,12 @@ fn evaluate_expression(
Ok(match op {
UnaryOperator::Not => Arc::new(not(downcast_to_bool(&arr)?)?),
UnaryOperator::IsNull => Arc::new(is_null(&arr)?),
UnaryOperator::VariantCoalesce => {
let result_type = result_type.ok_or(Error::generic(
"Kernel data type must be supplied to the 'variant_coalesce' expression.",
))?;
variant_coalesce_impl(arr, result_type)?
}
})
}
(BinaryOperation { op, left, right }, _) => {
Expand Down
6 changes: 6 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ pub enum Error {
/// Incosistent data passed to struct scalar
#[error("Invalid struct data: {0}")]
InvalidStructData(String),

#[error ("Invalid variant representation: {0}")]
InvalidVariantRepresentation(String),
}

// Convenience constructors for Error types that take a String argument
Expand Down Expand Up @@ -192,6 +195,9 @@ impl Error {
pub fn invalid_struct_data(msg: impl ToString) -> Self {
Self::InvalidStructData(msg.to_string())
}
pub fn invalid_variant_representation(msg: impl ToString) -> Self {
Self::InvalidVariantRepresentation(msg.to_string())
}

// Capture a backtrace when the error is constructed.
#[must_use]
Expand Down
8 changes: 8 additions & 0 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub enum UnaryOperator {
Not,
/// Unary Is Null
IsNull,
/// Unary VariantCoalesce
VariantCoalesce,
}

/// A SQL expression.
Expand Down Expand Up @@ -135,6 +137,7 @@ impl Display for Expression {
Self::UnaryOperation { op, expr } => match op {
UnaryOperator::Not => write!(f, "NOT {}", expr),
UnaryOperator::IsNull => write!(f, "{} IS NULL", expr),
UnaryOperator::VariantCoalesce => write!(f, "VARIANT_COALESCE({})", expr),
},
Self::VariadicOperation { op, exprs } => match op {
VariadicOperator::And => {
Expand Down Expand Up @@ -227,6 +230,11 @@ impl Expression {
Self::unary(UnaryOperator::IsNull, self)
}

/// Create a new expression `variant_coalesce(self)`
pub fn variant_coalesce(self) -> Self {
Self::unary(UnaryOperator::VariantCoalesce, self)
}

/// Create a new expression `self == other`
pub fn eq(self, other: Self) -> Self {
Self::binary(BinaryOperator::Equal, self, other)
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/expressions/scalars.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::panic;
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};

Expand Down Expand Up @@ -314,6 +315,8 @@ impl PrimitiveType {
_ => unreachable!(),
}
}
// TODO(r.chen): handle this
Variant => panic!("UNSUPPORTED")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Variant => panic!("UNSUPPORTED")
Variant => todo!(),

}
}

Expand Down
42 changes: 41 additions & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Functionality to create and execute scans (reads) over data stored in a delta table

use std::ptr::null;
use std::sync::Arc;

use itertools::Itertools;
Expand Down Expand Up @@ -269,6 +270,45 @@ impl Scan {
let read_results =
parquet_handler.read_parquet_files(&[meta], self.physical_schema.clone(), None)?;

// The rest of the kernel expects variants to be reconstructed, so they must be
// reconstructed immediately after scanning the parquet files.
let variant_coalesce_expr = match &self.snapshot.protocol().reader_features {
Some(reader_features) => {
// TODO(r.chen): Only check this if the variantType feature is enabled.
// if reader_features.contains(&"variantType-preview".to_string())
Comment on lines +276 to +278
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be able to add the check directly to the match?

                Some(reader_features) if reader_features.contains(...) => {

let all_fields_expr =
self.physical_schema
.fields
.iter()
.map(|(_, field)| {
Expression::variant_coalesce(Expression::column(
field.name.clone(),
))
Comment on lines +284 to +286
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Expression::column takes an impl ToString, I think this should work?

Suggested change
Expression::variant_coalesce(Expression::column(
field.name.clone(),
))
Expression::variant_coalesce(Expression::column(field.name))

Copy link
Collaborator

@scovich scovich Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: (attn @nicklan) -- The blanket implementation of ToString trait will make a copy of even an owned string, because it's implemented in terms of Display. Maybe that's not quite what we want?

Maybe these methods should take impl Into<String> instead? Technically, they're not equivalent (can't have an impl<T: IntoString> From<T> for String because it would conflict with the blanket impl<T> From<T> for T). That said, String has From<T> for all the "obvious" types like &str and &String, so maybe we can get away with it in practice?

})
.collect::<Vec<_>>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turbofish shouldn't be needed, because Expression::Struct already constrains it to be a Vec?

Suggested change
.collect::<Vec<_>>();
.collect();

Some(Expression::Struct(all_fields_expr))
}
None => None,
};

let mut variant_coalesce_results: Vec<DeltaResult<Box<dyn EngineData>>> = vec![];
match variant_coalesce_expr {
Some(expr) => {
for result in read_results {
let coalesced_batch = engine
.get_expression_handler()
.get_evaluator(
self.physical_schema.clone(),
expr.clone(),
output_schema.clone(),
)
.evaluate(result?.as_ref());
variant_coalesce_results.push(coalesced_batch);
}
}
None => (),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right... if variant is disabled then the entire query result is thrown away?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it would be a lot cleaner to have a two pass approach:

  • At query compilation time, verify that variant mode is actually enabled if any variant column is found in the schema
  • At runtime, unconditionally apply VariantCoalesce, since it's anyway a no-op for non-variant columns

Then we don't need all this option and if/else control flow here?

};

let read_expression = if self.have_partition_cols
|| self.snapshot.column_mapping_mode != ColumnMappingMode::None
{
Expand Down Expand Up @@ -307,7 +347,7 @@ impl Scan {

let mut dv_mask = dv_treemap.map(treemap_to_bools);

for read_result in read_results {
for read_result in variant_coalesce_results {
let len = if let Ok(ref res) = read_result {
res.length()
} else {
Expand Down
5 changes: 4 additions & 1 deletion kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ pub enum PrimitiveType {
Timestamp,
#[serde(rename = "timestamp_ntz")]
TimestampNtz,
Variant,
#[serde(
serialize_with = "serialize_decimal",
deserialize_with = "deserialize_decimal",
Expand Down Expand Up @@ -418,7 +419,8 @@ impl Display for PrimitiveType {
PrimitiveType::TimestampNtz => write!(f, "timestamp_ntz"),
PrimitiveType::Decimal(precision, scale) => {
write!(f, "decimal({},{})", precision, scale)
}
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think commas are required after case arms that use curly braces?

PrimitiveType::Variant => write!(f, "variant")
}
}
}
Expand Down Expand Up @@ -476,6 +478,7 @@ impl DataType {
pub const DATE: Self = DataType::Primitive(PrimitiveType::Date);
pub const TIMESTAMP: Self = DataType::Primitive(PrimitiveType::Timestamp);
pub const TIMESTAMP_NTZ: Self = DataType::Primitive(PrimitiveType::TimestampNtz);
pub const VARIANT: Self = DataType::Primitive(PrimitiveType::Variant);

pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
PrimitiveType::check_decimal(precision, scale)?;
Expand Down
Loading