Skip to content

Commit

Permalink
refactor(rust): Remove short-lived / non-CPU bound task spawns on asy…
Browse files Browse the repository at this point in the history
…nc executor in new-streaming (#18764)
  • Loading branch information
nameexhaustion authored Sep 18, 2024
1 parent 16623bb commit d11c5c3
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 55 deletions.
32 changes: 14 additions & 18 deletions crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::async_primitives::connector::connector;
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
use crate::morsel::get_ideal_morsel_size;
use crate::nodes::{MorselSeq, TaskPriority};
use crate::utils::task_handles_ext;

impl ParquetSourceNode {
/// # Panics
Expand All @@ -36,7 +37,7 @@ impl ParquetSourceNode {
// Safety
// * We dropped the receivers on the line above
// * This function is only called once.
morsel_stream_task_handle.await
morsel_stream_task_handle.await.unwrap()
}

pub(super) fn shutdown(&self) -> impl Future<Output = PolarsResult<()>> {
Expand All @@ -61,15 +62,16 @@ impl ParquetSourceNode {
.spawn(Self::shutdown_impl(async_task_data, self.verbose));
}

/// Constructs the task that provides a morsel stream.
/// Constructs the task that distributes morsels across the engine pipelines.
#[allow(clippy::type_complexity)]
pub(super) fn init_raw_morsel_stream(
pub(super) fn init_raw_morsel_distributor(
&mut self,
) -> (
Vec<crate::async_primitives::connector::Receiver<(DataFrame, MorselSeq, WaitToken)>>,
async_executor::AbortOnDropHandle<PolarsResult<()>>,
task_handles_ext::AbortOnDropHandle<PolarsResult<()>>,
) {
let verbose = self.verbose;
let io_runtime = polars_io::pl_async::get_runtime();

let use_statistics = self.options.use_statistics;

Expand All @@ -79,10 +81,7 @@ impl ParquetSourceNode {
if let Some((_, 0)) = self.file_options.slice {
return (
raw_morsel_receivers,
async_executor::AbortOnDropHandle::new(async_executor::spawn(
TaskPriority::Low,
std::future::ready(Ok(())),
)),
task_handles_ext::AbortOnDropHandle(io_runtime.spawn(std::future::ready(Ok(())))),
);
}

Expand Down Expand Up @@ -126,12 +125,9 @@ impl ParquetSourceNode {
let row_group_decoder = self.init_row_group_decoder();
let row_group_decoder = Arc::new(row_group_decoder);

// Processes row group metadata and spawns I/O tasks to fetch row group data. This is
// currently spawned onto the CPU runtime as it does not directly make any async I/O calls,
// but instead it potentially performs predicate/slice evaluation on metadata. If we observe
// that under heavy CPU load scenarios the I/O throughput drops due to this task not being
// scheduled we can change it to be a high priority task.
let morsel_stream_task_handle = async_executor::spawn(TaskPriority::Low, async move {
// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -
// it is purely a dispatch loop.
let raw_morsel_distributor_task_handle = io_runtime.spawn(async move {
let slice_range = {
let Ok(slice) = normalized_slice_oneshot_rx.await else {
// If we are here then the producer probably errored.
Expand Down Expand Up @@ -177,7 +173,7 @@ impl ParquetSourceNode {
.into_stream()
.map(|x| async {
match x {
Ok(handle) => handle.await,
Ok(handle) => handle.await.unwrap(),
Err(e) => Err(e),
}
})
Expand Down Expand Up @@ -258,10 +254,10 @@ impl ParquetSourceNode {
metadata_task_handle.await.unwrap()
});

let morsel_stream_task_handle =
async_executor::AbortOnDropHandle::new(morsel_stream_task_handle);
let raw_morsel_distributor_task_handle =
task_handles_ext::AbortOnDropHandle(raw_morsel_distributor_task_handle);

(raw_morsel_receivers, morsel_stream_task_handle)
(raw_morsel_receivers, raw_morsel_distributor_task_handle)
}

/// Creates a `RowGroupDecoder` that turns `RowGroupData` into DataFrames.
Expand Down
9 changes: 5 additions & 4 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use polars_plan::prelude::FileScanOptions;

use super::compute_node_prelude::*;
use super::{MorselSeq, TaskPriority};
use crate::async_executor::{self};
use crate::async_primitives::wait_group::WaitToken;
use crate::morsel::SourceToken;
use crate::utils::task_handles_ext;

mod init;
mod mem_prefetch_funcs;
Expand All @@ -30,7 +30,7 @@ mod row_group_decode;

type AsyncTaskData = Option<(
Vec<crate::async_primitives::connector::Receiver<(DataFrame, MorselSeq, WaitToken)>>,
async_executor::AbortOnDropHandle<PolarsResult<()>>,
task_handles_ext::AbortOnDropHandle<PolarsResult<()>>,
)>;

#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -157,12 +157,13 @@ impl ComputeNode for ParquetSourceNode {
self.init_projected_arrow_schema();
self.physical_predicate = self.predicate.clone().map(phys_expr_to_io_expr);

let (raw_morsel_receivers, morsel_stream_task_handle) = self.init_raw_morsel_stream();
let (raw_morsel_receivers, raw_morsel_distributor_task_handle) =
self.init_raw_morsel_distributor();

self.async_task_data
.try_lock()
.unwrap()
.replace((raw_morsel_receivers, morsel_stream_task_handle));
.replace((raw_morsel_receivers, raw_morsel_distributor_task_handle));
}

fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use polars_utils::IdxSize;

use super::mem_prefetch_funcs;
use super::row_group_decode::SharedFileState;
use crate::async_executor;
use crate::nodes::TaskPriority;
use crate::utils::task_handles_ext;

/// Represents byte-data that can be transformed into a DataFrame after some computation.
Expand Down Expand Up @@ -81,7 +79,7 @@ impl RowGroupDataFetcher {

pub(super) async fn next(
&mut self,
) -> Option<PolarsResult<async_executor::AbortOnDropHandle<PolarsResult<RowGroupData>>>> {
) -> Option<PolarsResult<task_handles_ext::AbortOnDropHandle<PolarsResult<RowGroupData>>>> {
'main: loop {
for row_group_metadata in self.current_row_groups.by_ref() {
let current_row_offset = self.current_row_offset;
Expand Down Expand Up @@ -164,9 +162,7 @@ impl RowGroupDataFetcher {
let current_path_index = self.current_path_index;
let current_max_row_group_height = self.current_max_row_group_height;

// Push calculation of byte ranges to a task to run in parallel, as it can be
// expensive for very wide tables and projections.
let handle = async_executor::spawn(TaskPriority::Low, async move {
let handle = io_runtime.spawn(async move {
let fetched_bytes = if let DynByteSource::MemSlice(mem_slice) =
current_byte_source.as_ref()
{
Expand Down Expand Up @@ -205,16 +201,9 @@ impl RowGroupDataFetcher {
&row_group_metadata,
columns.as_ref(),
)
.collect::<Arc<[_]>>();
.collect::<Vec<_>>();

let bytes = {
let ranges_2 = ranges.clone();
task_handles_ext::AbortOnDropHandle(io_runtime.spawn(async move {
current_byte_source.get_ranges(ranges_2.as_ref()).await
}))
.await
.unwrap()?
};
let bytes = current_byte_source.get_ranges(ranges.as_ref()).await?;

assert_eq!(bytes.len(), ranges.len());

Expand Down Expand Up @@ -261,7 +250,7 @@ impl RowGroupDataFetcher {
})
});

let handle = async_executor::AbortOnDropHandle::new(handle);
let handle = task_handles_ext::AbortOnDropHandle(handle);
return Some(Ok(handle));
}

Expand Down Expand Up @@ -302,12 +291,12 @@ type RowGroupDataStreamFut = std::pin::Pin<Box<
dyn Future<
Output =
(
Box<RowGroupDataFetcher> ,
Option <
PolarsResult <
async_executor::AbortOnDropHandle <
PolarsResult <
RowGroupData > > > >
Box<RowGroupDataFetcher> ,
Option <
PolarsResult <
task_handles_ext::AbortOnDropHandle <
PolarsResult <
RowGroupData > > > >
)
> + Send
>>;
Expand Down Expand Up @@ -335,7 +324,7 @@ impl RowGroupDataStream {
}

impl futures::stream::Stream for RowGroupDataStream {
type Item = PolarsResult<async_executor::AbortOnDropHandle<PolarsResult<RowGroupData>>>;
type Item = PolarsResult<task_handles_ext::AbortOnDropHandle<PolarsResult<RowGroupData>>>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
14 changes: 4 additions & 10 deletions crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,17 @@ impl RowGroupDecoder {

let mut out_columns = Vec::with_capacity(out_width);

if self.row_index.is_some() {
// Add a placeholder so that we don't have to shift the entire vec
// later.
out_columns.push(Column::default());
}

let slice_range = row_group_data
.slice
.map(|(offset, len)| offset..offset + len)
.unwrap_or(0..row_group_data.row_group_metadata.num_rows());

assert!(slice_range.end <= row_group_data.row_group_metadata.num_rows());

if let Some(s) = self.materialize_row_index(row_group_data.as_ref(), slice_range.clone())? {
out_columns.push(s);
}

self.decode_all_columns(
&mut out_columns,
&row_group_data,
Expand All @@ -93,10 +91,6 @@ impl RowGroupDecoder {
out_columns.last().unwrap().len()
};

if let Some(s) = self.materialize_row_index(row_group_data.as_ref(), slice_range)? {
out_columns[0] = s;
}

let shared_file_state = row_group_data
.shared_file_state
.get_or_init(|| self.shared_file_state_init_func(&row_group_data))
Expand Down

0 comments on commit d11c5c3

Please sign in to comment.