Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a single row_count column during predicate pruning instead of one per column #14295

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Jan 25, 2025

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate labels Jan 25, 2025
@adriangb
Copy link
Contributor Author

@alamb this seems to be a simple enough change that is almost self contained: the only breaking change that I see would be in PredicateRewriter which we recently introduced in #12850. I can't be sure of course but I'd guess we (Pydantic) are the only ones using this, we'd be happy to absorb the breaking change. The alternative would be to add an option to PredicateRewriter to control this behavior, which requires more code, etc. Up to you 😄.

@adriangb
Copy link
Contributor Author

I want to point out that this works because of how the Recordbatch is generated:

for (column, statistics_type, stat_field) in required_columns.iter() {
let column = Column::from_name(column.name());
let data_type = stat_field.data_type();
let num_containers = statistics.num_containers();
let array = match statistics_type {
StatisticsType::Min => statistics.min_values(&column),
StatisticsType::Max => statistics.max_values(&column),
StatisticsType::NullCount => statistics.null_counts(&column),
StatisticsType::RowCount => statistics.row_counts(&column),
};
let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
if num_containers != array.len() {
return internal_err!(
"mismatched statistics length. Expected {}, got {}",
num_containers,
array.len()
);
}
// cast statistics array to required data type (e.g. parquet
// provides timestamp statistics as "Int64")
let array = arrow::compute::cast(&array, data_type)?;
fields.push(stat_field.clone());
arrays.push(array);
}

Since it's generated based on the columns tracked by RequiredColumns we can just rename it internally with no consequences.

This should also save some work in creating the array, make scanning the record batch faster, etc.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jan 25, 2025
@adriangb adriangb force-pushed the only-require-a-single-row-count-2 branch from adf5e7c to da0f264 Compare January 25, 2025 18:55
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb -- I think this is a nice improvement

My only concern is that now the RequiredColumns structure now can have repeated field names and that could potentially result in an error creating a RecordBatch to feed to the pruning statistics

I wrote a test case showing the issue I am worried about below.

That being said, however, since all the tests are passing it somehow works and this PR is good to go in my mind.

Maybe as a follow on PR we can make an API breaking change (maybe via deprecation) to update PruningStatistics so that row_count is not a function of column anymore

    #[test]
    fn test_unique_field_names() {
        // c1 = 100 AND c2 = 200
        let schema: SchemaRef = Arc::new(Schema::new(vec![
            Field::new("c1", DataType::Int32, true),
            Field::new("c2", DataType::Int32, true),
        ]));
        let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
        let expr = logical2physical(&expr, &schema);
        let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
        // note pruning expression refers to row_count twice
        assert_eq!(
            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@7 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
            p.predicate_expr.to_string()
        );

        // Fields in required schema should be unique, otherwise when creating batches
        // it will fail because of duplicate field names
        let mut fields = HashSet::new();
        for (_col, _ty, field) in p.required_columns().iter() {
            let was_new = fields.insert(field);
            if !was_new {
                panic!(
                    "Duplicate field in required schema: {:?}. Previous fields:\n{:#?}",
                    field, fields
                );
            }
        }
    }

Fails like this:

Duplicate field in required schema: Field { name: "row_count", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }. Previous fields:
{
    Field {
        name: "c2_min",
        data_type: Int32,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c1_max",
        data_type: Int32,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c1_null_count",
        data_type: UInt64,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "row_count",
        data_type: UInt64,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c2_max",
        data_type: Int32,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c1_min",
        data_type: Int32,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c2_null_count",
        data_type: UInt64,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
}
thread 'pruning::tests::test_unique_field_names' panicked at datafusion/physical-optimizer/src/pruning.rs:4193:17:
Duplicate field in required schema: Field { name: "row_count", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }. Previous fields:
{
    Field {
        name: "c2_min",
        data_type: Int32,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c1_max",
        data_type: Int32,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c1_null_count",
        data_type: UInt64,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "row_count",
        data_type: UInt64,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c2_max",
        data_type: Int32,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c1_min",
        data_type: Int32,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
    Field {
        name: "c2_null_count",
        data_type: UInt64,
        nullable: true,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
}
stack backtrace:
   0: rust_begin_unwind
             at /rustc/e71f9a9a98b0faf423844bf0ba7438f29dc27d58/library/std/src/panicking.rs:665:5
   1: core::panicking::panic_fmt
             at /rustc/e71f9a9a98b0faf423844bf0ba7438f29dc27d58/library/core/src/panicking.rs:76:14
   2: datafusion_physical_optimizer::pruning::tests::test_unique_field_names
             at ./src/pruning.rs:4193:17
   3: datafusion_physical_optimizer::pruning::tests::test_unique_field_names::{{closure}}
             at ./src/pruning.rs:4172:33
   4: core::ops::function::FnOnce::call_once
             at /Users/andrewlamb/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ops/function.rs:250:5
   5: core::ops::function::FnOnce::call_once
             at /rustc/e71f9a9a98b0faf423844bf0ba7438f29dc27d58/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

StatisticsType::Max => "max",
StatisticsType::NullCount => "null_count",
StatisticsType::RowCount => "row_count",
let column_name = column.name();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only question I have is that PruningStatistics is now still in terms of Column:

/// [`UInt64Array`]: arrow::array::UInt64Array
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;

So that seems like it means that when building the schema for required columns there will be multiple entries for row_count (I provide a test elsewhere)

pub struct RequiredColumns {
    /// The statistics required to evaluate this predicate:
    /// * The unqualified column in the input schema
    /// * Statistics type (e.g. Min or Max or Null_Count)
    /// * The field the statistics value should be placed in for
    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
    columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Why does PruningPredicate reference a row_count for each column?
2 participants