Skip to content

Commit

Permalink
Add feature flag DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 6, 2024
1 parent f0fdef2 commit 491054b
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 74 deletions.
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def set_execution_config(
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
enable_ray_tracing: bool | None = None,
enable_aggressive_scantask_splitting: bool | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values
are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`)
Expand Down Expand Up @@ -390,6 +391,7 @@ def set_execution_config(
shuffle_algorithm: The shuffle algorithm to use. Defaults to "map_reduce". Other options are "pre_shuffle_merge".
pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB
enable_ray_tracing: Enable tracing for Ray. Accessible in `/tmp/ray/session_latest/logs/daft` after the run completes. Defaults to False.
enable_aggressive_scantask_splitting: Enable more aggressive splitting of ScanTasks to make smaller partitions. Defaults to False.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand Down Expand Up @@ -418,6 +420,7 @@ def set_execution_config(
shuffle_algorithm=shuffle_algorithm,
pre_shuffle_merge_threshold=pre_shuffle_merge_threshold,
enable_ray_tracing=enable_ray_tracing,
enable_aggressive_scantask_splitting=enable_aggressive_scantask_splitting,
)

ctx._daft_execution_config = new_daft_execution_config
Expand Down
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,7 @@ class PyDaftExecutionConfig:
enable_ray_tracing: bool | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
enable_aggressive_scantask_splitting: bool | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand Down
9 changes: 9 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct DaftExecutionConfig {
pub shuffle_algorithm: String,
pub pre_shuffle_merge_threshold: usize,
pub enable_ray_tracing: bool,
pub enable_aggressive_scantask_splitting: bool,
}

impl Default for DaftExecutionConfig {
Expand All @@ -77,6 +78,7 @@ impl Default for DaftExecutionConfig {
shuffle_algorithm: "map_reduce".to_string(),
pre_shuffle_merge_threshold: 1024 * 1024 * 1024, // 1GB
enable_ray_tracing: false,
enable_aggressive_scantask_splitting: false,
}
}
}
Expand Down Expand Up @@ -110,6 +112,13 @@ impl DaftExecutionConfig {
{
cfg.enable_ray_tracing = true;
}
let enable_aggressive_scantask_splitting_env_var_name =
"DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING";
if let Ok(val) = std::env::var(enable_aggressive_scantask_splitting_env_var_name)
&& matches!(val.trim().to_lowercase().as_str(), "1" | "true")
{
cfg.enable_aggressive_scantask_splitting = true;

Check warning on line 120 in src/common/daft-config/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

src/common/daft-config/src/lib.rs#L118-L120

Added lines #L118 - L120 were not covered by tests
}
cfg
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl PyDaftExecutionConfig {
shuffle_algorithm: Option<&str>,
pre_shuffle_merge_threshold: Option<usize>,
enable_ray_tracing: Option<bool>,
enable_aggressive_scantask_splitting: Option<bool>,
) -> PyResult<Self> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -175,6 +176,10 @@ impl PyDaftExecutionConfig {
config.enable_ray_tracing = enable_ray_tracing;
}

if let Some(enable_aggressive_scantask_splitting) = enable_aggressive_scantask_splitting {
config.enable_aggressive_scantask_splitting = enable_aggressive_scantask_splitting;
}

Ok(Self {
config: Arc::new(config),
})
Expand Down Expand Up @@ -274,6 +279,11 @@ impl PyDaftExecutionConfig {
fn enable_ray_tracing(&self) -> PyResult<bool> {
Ok(self.config.enable_ray_tracing)
}

#[getter]
fn enable_aggressive_scantask_splitting(&self) -> PyResult<bool> {
Ok(self.config.enable_aggressive_scantask_splitting)
}

Check warning on line 286 in src/common/daft-config/src/python.rs

View check run for this annotation

Codecov / codecov/patch

src/common/daft-config/src/python.rs#L284-L286

Added lines #L284 - L286 were not covered by tests
}

impl_bincode_py_state_serialization!(PyDaftExecutionConfig);
182 changes: 175 additions & 7 deletions src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
mod split_parquet_files_by_rowgroup;

use std::sync::Arc;

use common_daft_config::DaftExecutionConfig;
use common_error::{DaftError, DaftResult};
use common_file_formats::{FileFormatConfig, ParquetSourceConfig};
use common_scan_info::{ScanTaskLike, ScanTaskLikeRef, SPLIT_AND_MERGE_PASS};
use split_parquet_files_by_rowgroup::split_by_row_groups;
use daft_io::IOStatsContext;
use daft_parquet::read::read_parquet_metadata;
use parquet2::metadata::RowGroupList;

mod split_parquet_files_by_rowgroup;

use crate::{Pushdowns, ScanTask, ScanTaskRef};
use crate::{
storage_config::StorageConfig, ChunkSpec, DataSource, Pushdowns, ScanTask, ScanTaskRef,
};

pub(crate) type BoxScanTaskIter<'a> = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>> + 'a>;

Expand Down Expand Up @@ -174,6 +179,139 @@ impl<'a> Iterator for MergeByFileSize<'a> {
}
}

#[must_use]
pub(crate) fn split_by_row_groups(
scan_tasks: BoxScanTaskIter,
max_tasks: usize,
min_size_bytes: usize,
max_size_bytes: usize,
) -> BoxScanTaskIter {
let mut scan_tasks = itertools::peek_nth(scan_tasks);

// only split if we have a small amount of files
if scan_tasks.peek_nth(max_tasks).is_some() {
Box::new(scan_tasks)
} else {
Box::new(
scan_tasks
.map(move |t| -> DaftResult<BoxScanTaskIter> {
let t = t?;

/* Only split parquet tasks if they:
- have one source
- use native storage config
- have no specified chunk spec or number of rows
- have size past split threshold
- no iceberg delete files
*/
if let (
FileFormatConfig::Parquet(ParquetSourceConfig {
field_id_mapping, ..
}),
StorageConfig::Native(_),
[source],
Some(None),
None,
) = (
t.file_format_config.as_ref(),
t.storage_config.as_ref(),
&t.sources[..],
t.sources.first().map(DataSource::get_chunk_spec),
t.pushdowns.limit,
) && source
.get_size_bytes()
.map_or(true, |s| s > max_size_bytes as u64)
&& source
.get_iceberg_delete_files()
.map_or(true, std::vec::Vec::is_empty)
{
let (io_runtime, io_client) =
t.storage_config.get_io_client_and_runtime()?;

let path = source.get_path();

let io_stats =
IOStatsContext::new(format!("split_by_row_groups for {path:#?}"));

let mut file = io_runtime.block_on_current_thread(read_parquet_metadata(
path,
io_client,
Some(io_stats),
field_id_mapping.clone(),
))?;

let mut new_tasks: Vec<DaftResult<ScanTaskRef>> = Vec::new();
let mut curr_row_group_indices = Vec::new();
let mut curr_row_groups = Vec::new();
let mut curr_size_bytes = 0;
let mut curr_num_rows = 0;

let row_groups = std::mem::take(&mut file.row_groups);
let num_row_groups = row_groups.len();
for (i, rg) in row_groups {
curr_row_groups.push((i, rg));
let rg = &curr_row_groups.last().unwrap().1;
curr_row_group_indices.push(i as i64);
curr_size_bytes += rg.compressed_size();
curr_num_rows += rg.num_rows();

if curr_size_bytes >= min_size_bytes || i == num_row_groups - 1 {
let mut new_source = source.clone();

if let DataSource::File {
chunk_spec,
size_bytes,
parquet_metadata,
..
} = &mut new_source
{
// only keep relevant row groups in the metadata
let row_group_list = RowGroupList::from_iter(curr_row_groups.into_iter());
let new_metadata = file.clone_with_row_groups(curr_num_rows, row_group_list);
*parquet_metadata = Some(Arc::new(new_metadata));

*chunk_spec = Some(ChunkSpec::Parquet(curr_row_group_indices));
*size_bytes = Some(curr_size_bytes as u64);
} else {
unreachable!("Parquet file format should only be used with DataSource::File");
}

if let DataSource::File {
metadata: Some(metadata),
..
} = &mut new_source
{
metadata.length = curr_num_rows;
}

// Reset accumulators
curr_row_groups = Vec::new();
curr_row_group_indices = Vec::new();
curr_size_bytes = 0;
curr_num_rows = 0;

new_tasks.push(Ok(ScanTask::new(
vec![new_source],
t.file_format_config.clone(),
t.schema.clone(),
t.storage_config.clone(),
t.pushdowns.clone(),
t.generated_fields.clone(),
)
.into()));
}
}

Ok(Box::new(new_tasks.into_iter()))
} else {
Ok(Box::new(std::iter::once(Ok(t))))
}
})
.flat_map(|t| t.unwrap_or_else(|e| Box::new(std::iter::once(Err(e))))),
)
}
}

fn split_and_merge_pass(
scan_tasks: Arc<Vec<ScanTaskLikeRef>>,
pushdowns: &Pushdowns,
Expand All @@ -193,9 +331,14 @@ fn split_and_merge_pass(
.downcast::<ScanTask>()
.map_err(|e| DaftError::TypeError(format!("Expected Arc<ScanTask>, found {:?}", e)))
}));
let split_tasks = split_by_row_groups(iter, cfg);
let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg);
let scan_tasks: Vec<Arc<dyn ScanTaskLike>> = merged_tasks

let optimized = if cfg.enable_aggressive_scantask_splitting {
split_and_merge_pass_v2(iter, pushdowns, cfg)
} else {
split_and_merge_pass_v1(iter, pushdowns, cfg)
};

let scan_tasks: Vec<Arc<dyn ScanTaskLike>> = optimized
.map(|st| st.map(|task| task as Arc<dyn ScanTaskLike>))
.collect::<DaftResult<Vec<_>>>()?;
Ok(Arc::new(scan_tasks))
Expand All @@ -204,6 +347,31 @@ fn split_and_merge_pass(
}
}

fn split_and_merge_pass_v1<'a>(
inputs: BoxScanTaskIter<'a>,
pushdowns: &Pushdowns,
cfg: &'a DaftExecutionConfig,
) -> BoxScanTaskIter<'a> {
let split_tasks = split_by_row_groups(
inputs,
cfg.parquet_split_row_groups_max_files,
cfg.scan_tasks_min_size_bytes,
cfg.scan_tasks_max_size_bytes,
);
let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg);
merged_tasks
}

fn split_and_merge_pass_v2<'a>(
inputs: BoxScanTaskIter<'a>,
pushdowns: &Pushdowns,
cfg: &'a DaftExecutionConfig,
) -> BoxScanTaskIter<'a> {
let split_tasks = split_parquet_files_by_rowgroup::split_all_files_by_rowgroup(inputs, cfg);
let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg);
merged_tasks
}

#[ctor::ctor]
fn set_pass() {
let _ = SPLIT_AND_MERGE_PASS.set(&split_and_merge_pass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ impl<'a> Iterator for SplitParquetFilesByRowGroups<'a> {
}

#[must_use]
pub(crate) fn split_by_row_groups<'a>(
pub(crate) fn split_all_files_by_rowgroup<'a>(
scan_tasks: BoxScanTaskIter<'a>,
config: &'a DaftExecutionConfig,
) -> BoxScanTaskIter<'a> {
Expand Down
Loading

0 comments on commit 491054b

Please sign in to comment.