Skip to content

Commit

Permalink
feat: ignorable errors
Browse files Browse the repository at this point in the history
  • Loading branch information
eliaperantoni committed Dec 11, 2024
1 parent 2d3e2a1 commit 0506a80
Show file tree
Hide file tree
Showing 31 changed files with 276 additions and 125 deletions.
7 changes: 3 additions & 4 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use std::fmt;
use std::str::FromStr;

/// A named reference to a qualified field in a schema.
#[derive(Debug, Clone, Derivative)]
#[derivative(PartialEq, Eq, Hash, PartialOrd, Ord)]
#[derive(Debug, Derivative)]
#[derivative(PartialEq, Eq, Hash, PartialOrd, Ord, Clone)]
pub struct Column {
/// relation/table reference.
pub relation: Option<TableReference>,
Expand Down Expand Up @@ -254,8 +254,7 @@ impl Column {
.flat_map(|s| s.columns_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
for using_col in using_columns {
let all_matched =
columns.iter().all(|c| using_col.contains(c));
let all_matched = columns.iter().all(|c| using_col.contains(c));
// All matched fields belong to the same using column set, in orther words
// the same join clause. We simply pick the qualifier from the first match.
if all_matched {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/dfschema/fields_spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct FieldsSpans(Vec<Vec<Span>>);

impl FieldsSpans {
pub fn empty(field_count: usize) -> Self {
Self((0..field_count).into_iter().map(|_| Vec::new()).collect())
Self((0..field_count).map(|_| Vec::new()).collect())
}

pub fn iter(&self) -> impl Iterator<Item = &Vec<Span>> {
Expand All @@ -20,7 +20,7 @@ impl FieldsSpans {
&self,
other: &FieldsSpans,
join_type: &JoinType,
left_cols_len: usize,
_left_cols_len: usize,
) -> FieldsSpans {
match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
Expand Down
1 change: 0 additions & 1 deletion datafusion/common/src/dfschema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
mod fields_spans;
pub use fields_spans::FieldsSpans;

use std::backtrace::Backtrace;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::hash::Hash;
Expand Down
37 changes: 22 additions & 15 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;

use crate::diagnostic::Diagnostic;
use crate::utils::quote_identifier;
use crate::{Column, DFSchema, DiagnosticEntry, DiagnosticEntryKind, TableReference};
use crate::{Column, DFSchema, TableReference};
#[cfg(feature = "avro")]
use apache_avro::Error as AvroError;
use arrow::error::ArrowError;
Expand Down Expand Up @@ -514,8 +514,7 @@ impl DataFusionError {
if let Some(source) = self
.head
.source()
.map(|source| source.downcast_ref::<DataFusionError>())
.flatten()
.and_then(|source| source.downcast_ref::<DataFusionError>())
{
self.head = source;
} else {
Expand All @@ -528,7 +527,9 @@ impl DataFusionError {
DiagnosticsIterator { head: self }
}

pub fn get_individual_errors(&self) -> impl Iterator<Item = &Self> + '_ {
pub fn get_individual_errors(
&self,
) -> impl Iterator<Item = (Vec<Diagnostic>, &Self)> + '_ {
fn contains_collection(err: &DataFusionError) -> bool {
let mut head = err;
loop {
Expand All @@ -538,8 +539,7 @@ impl DataFusionError {

if let Some(source) = head
.source()
.map(|source| source.downcast_ref::<DataFusionError>())
.flatten()
.and_then(|source| source.downcast_ref::<DataFusionError>())
{
head = source;
} else {
Expand All @@ -549,37 +549,44 @@ impl DataFusionError {
}

struct IndividualErrorsIterator<'a> {
queue: Vec<&'a DataFusionError>,
queue: Vec<(&'a DataFusionError, Vec<Diagnostic>)>,
}

impl<'a> Iterator for IndividualErrorsIterator<'a> {
type Item = &'a DataFusionError;
type Item = (Vec<Diagnostic>, &'a DataFusionError);

fn next(&mut self) -> Option<Self::Item> {
while let Some(err) = self.queue.pop() {
while let Some((err, mut diagnostics_prefix)) = self.queue.pop() {
if !contains_collection(err) {
return Some(err);
return Some((diagnostics_prefix, err));
}

if let DataFusionError::Collection(errs) = err {
self.queue.extend(errs.iter());
self.queue.extend(
errs.iter().map(|err| (err, diagnostics_prefix.clone())),
);
continue;
}

if let DataFusionError::Diagnostic(diagnostics, _) = err {
diagnostics_prefix.push(diagnostics.clone());
}

if let Some(source) = err
.source()
.map(|source| source.downcast_ref::<DataFusionError>())
.flatten()
.and_then(|source| source.downcast_ref::<DataFusionError>())
{
self.queue.push(source);
self.queue.push((source, diagnostics_prefix));
}
}

None
}
}

IndividualErrorsIterator { queue: vec![self] }
IndividualErrorsIterator {
queue: vec![(self, vec![])],
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod alias;
pub mod cast;
pub mod config;
pub mod cse;
pub mod diagnostic;
pub mod display;
pub mod error;
pub mod file_options;
Expand All @@ -47,15 +48,15 @@ pub mod test_util;
pub mod tree_node;
pub mod types;
pub mod utils;
pub mod diagnostic;
pub mod with_span;

/// Reexport arrow crate
pub use arrow;
pub use column::Column;
pub use dfschema::{
qualified_name, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema, FieldsSpans,
qualified_name, DFSchema, DFSchemaRef, ExprSchema, FieldsSpans, SchemaExt, ToDFSchema,
};
pub use diagnostic::{Diagnostic, DiagnosticEntry, DiagnosticEntryKind};
pub use error::{
field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError,
SharedResult,
Expand All @@ -78,7 +79,6 @@ pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{ResolvedTableReference, TableReference};
pub use unnest::{RecursionUnnestOption, UnnestOptions};
pub use utils::project_schema;
pub use diagnostic::{Diagnostic, DiagnosticEntry, DiagnosticEntryKind};
pub use with_span::WithSpans;

// These are hidden from docs purely to avoid polluting the public view of what this crate exports.
Expand Down
1 change: 0 additions & 1 deletion datafusion/common/src/with_span.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
cmp::Ordering,
fmt::{self, Debug, Display},
ops::{Deref, DerefMut},
};
Expand Down
1 change: 0 additions & 1 deletion datafusion/expr-common/src/type_coercion/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use datafusion_common::{
Diagnostic, DiagnosticEntry, DiagnosticEntryKind, Result, WithSpans,
};
use itertools::Itertools;
use sqlparser::tokenizer::Span;

/// The type signature of an instantiation of binary operator expression such as
/// `lhs + rhs`
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ serde_json = { workspace = true }
sqlparser = { workspace = true }
strum = { version = "0.26.1", features = ["derive"] }
strum_macros = "0.26.0"
derivative = { workspace = true }

[dev-dependencies]
ctor = { workspace = true }
Expand Down
37 changes: 32 additions & 5 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion_common::{
plan_err, Column, DFSchema, HashMap, Result, ScalarValue, TableReference,
};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use derivative::Derivative;
use sqlparser::ast::{
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
NullTreatment, RenameSelectItem, ReplaceSelectElement,
Expand Down Expand Up @@ -401,11 +402,19 @@ impl Unnest {
}

/// Alias expression
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
#[derive(Clone, Derivative, Debug)]
#[derivative(PartialEq, Eq, PartialOrd, Hash)]
pub struct Alias {
pub expr: Box<Expr>,
pub relation: Option<TableReference>,
pub name: String,
#[derivative(
PartialEq = "ignore",
Hash = "ignore",
PartialOrd = "ignore",
Ord = "ignore"
)]
pub span: Span,
}

impl Alias {
Expand All @@ -419,8 +428,13 @@ impl Alias {
expr: Box::new(expr),
relation: relation.map(|r| r.into()),
name: name.into(),
span: Span::empty(),
}
}

pub fn with_span(self, span: Span) -> Self {
Self { span, ..self }
}
}

/// Binary expression
Expand Down Expand Up @@ -1128,6 +1142,7 @@ impl Expr {
relation,
name,
spans: _,
..
}) => (relation.clone(), name.clone()),
Expr::Alias(Alias { relation, name, .. }) => (relation.clone(), name.clone()),
_ => (None, self.schema_name().to_string()),
Expand Down Expand Up @@ -1681,11 +1696,22 @@ impl Expr {
}
}

pub fn get_spans(&self) -> Option<&Vec<Span>> {
pub fn get_span(&self) -> Span {
match self {
Expr::Column(Column { spans, .. }) => Some(spans),
Expr::Alias(Alias { expr, .. }) => expr.get_spans(),
_ => None,
Expr::Column(Column { spans, .. }) => match spans.as_slice() {
[] => panic!("No spans for column expr"),
[span] => *span,
_ => panic!("Column expr has more than one span"),
},
Expr::Alias(Alias {
expr,
span: alias_span,
..
}) => {
let span = expr.get_span();
span.union(alias_span)
}
_ => Span::empty(),
}
}
}
Expand All @@ -1701,6 +1727,7 @@ impl HashNode for Expr {
expr: _expr,
relation,
name,
..
}) => {
relation.hash(state);
name.hash(state);
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub fn create_col_from_scalar_expr(
relation: _,
name,
spans,
..
}) => Ok(
Column::new(Some::<TableReference>(subqry_alias.into()), name)
.with_spans(spans.iter().copied()),
Expand Down
1 change: 0 additions & 1 deletion datafusion/expr/src/expr_rewriter/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{expr::Sort, Cast, Expr, LogicalPlan, TryCast};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column, Result};
use sqlparser::tokenizer::Span;

/// Rewrite sort on aggregate expressions to sort on the column of aggregate output
/// For example, `max(x)` is written to `col("max(x)")`
Expand Down
9 changes: 5 additions & 4 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use datafusion_common::{
};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use recursive::recursive;
use sqlparser::tokenizer::Span;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -405,16 +406,16 @@ impl ExprSchemable for Expr {
}) => {
let (left_type, left_is_nullable) =
left.data_type_and_nullable(schema)?;
let left_type = if let Some(spans) = left.get_spans() {
WithSpans::new(&left_type, spans.iter().copied())
let left_type = if left.get_span() != Span::empty() {
WithSpans::new(&left_type, [left.get_span()])
} else {
(&left_type).into()
};

let (right_type, right_is_nullable) =
right.data_type_and_nullable(schema)?;
let right_type = if let Some(spans) = right.get_spans() {
WithSpans::new(&right_type, spans.iter().copied())
let right_type = if right.get_span() != Span::empty() {
WithSpans::new(&right_type, [right.get_span()])
} else {
(&right_type).into()
};
Expand Down
10 changes: 8 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1543,14 +1543,20 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalP
Span::empty(),
),
DiagnosticEntry::new(
format!("This side has {} columns", left_plan.schema().fields().len()),
format!(
"This side has {} columns",
left_plan.schema().fields().len()
),
DiagnosticEntryKind::Note,
Span::union_iter(
left_plan.schema().fields_spans().iter().flatten().copied(),
),
),
DiagnosticEntry::new(
format!("This side has {} columns", right_plan.schema().fields().len()),
format!(
"This side has {} columns",
right_plan.schema().fields().len()
),
DiagnosticEntryKind::Note,
Span::union_iter(
right_plan.schema().fields_spans().iter().flatten().copied(),
Expand Down
6 changes: 1 addition & 5 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2163,12 +2163,8 @@ impl Projection {
/// produced by the projection operation. If the schema computation is successful,
/// the `Result` will contain the schema; otherwise, it will contain an error.
pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {

let metadata = input.schema().metadata().clone();
let fields_spans = exprs
.iter()
.map(|e| e.get_spans().cloned().unwrap_or_else(|| vec![]))
.collect();
let fields_spans = exprs.iter().map(|e| vec![e.get_span()]).collect();

let schema =
DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
Expand Down
11 changes: 10 additions & 1 deletion datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,16 @@ impl TreeNode for Expr {
expr,
relation,
name,
}) => f(*expr)?.update_data(|e| e.alias_qualified(relation, name)),
span,
..
}) => f(*expr)?.update_data(|e| {
let e = e.alias_qualified(relation, name);
if let Expr::Alias(alias) = e {
Expr::Alias(alias.with_span(span))
} else {
unreachable!();
}
}),
Expr::InSubquery(InSubquery {
expr,
subquery,
Expand Down
Loading

0 comments on commit 0506a80

Please sign in to comment.