-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
Small datasets undergo unnecessary repartitioning.
For example, a dataset of 5 rows will be split across multiple certain queries, leading to overhead that exceeds the parallelism benefit.
To Reproduce
An example can be found in the aggregate_repartition.slt file. This test creates two tables of 5 rows each in Parquet and CSV format, and an aggregation on one of their columns. Despite the small size, the query plan repartitions the data:
Setup the data:
COPY (
SELECT * FROM (VALUES
('prod', 100, 'A'),
('dev', 200, 'B'),
('test', 150, 'A'),
('prod', 300, 'C'),
('dev', 250, 'B')
) AS t(env, value, category)
)
TO 'test_files/scratch/aggregate_repartition/dim.csv'
STORED AS CSV
OPTIONS ('format.has_header' 'true');
COPY (
SELECT * FROM (VALUES
('prod', 100, 'A'),
('dev', 200, 'B'),
('test', 150, 'A'),
('prod', 300, 'C'),
('dev', 250, 'B')
) AS t(env, value, category)
)
TO 'test_files/scratch/aggregate_repartition/dim.parquet'
STORED AS PARQUET;
CREATE EXTERNAL TABLE dim_csv
STORED AS CSV
LOCATION 'test_files/scratch/aggregate_repartition/dim.csv'
OPTIONS ('format.has_header' 'true');
CREATE EXTERNAL TABLE dim_parquet
STORED AS PARQUET
LOCATION 'test_files/scratch/aggregate_repartition/dim.parquet';
CSV Physical Plan:
EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env;
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]}, projection=[env], file_type=csv, has_header=true
Parquet Physical Plan:
EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env;
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=1
05)--------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet
Expected behavior
The data sets should not be repartitioned when this small in size, thus should yield the plans:
CSV Physical Plan:
EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env;
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]}, projection=[env], file_type=csv, has_header=true
Parquet Physical Plan:
EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env;
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet
Additional context
The problem roots from enforce_distribution.rs:get_repartition_requirement_status.
Here round-robin partitioning is added based on statistics on the file:
- we are optimistic with CSV files and return
truesince CSV return no statistics. We could look at adding a better way to estimate if a CSV file needs repartitioning at the file level - We have exact statistics on Parquet files, thus we do not round robin repartition at the file level, but we still hash repartition later on in the query. We can look at how hash repartitioning is handles based on file statistics and improve this.