Skip to content

Commit

Permalink
fix source
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 12, 2024
1 parent e30f1ec commit bc9f37d
Showing 1 changed file with 45 additions and 24 deletions.
69 changes: 45 additions & 24 deletions crates/polars-pipe/src/executors/sinks/sort/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ impl SortSource {
eprintln!("full ooc sort took: {:?}", self.ooc_start.elapsed());
}
}

fn get_from_memory(
&mut self,
read: &mut Vec<DataFrame>,
read_size: &mut usize,
part: usize,
keep_track: bool,
) {
while self.current_part <= part {
if let Some(df) = self.partition_spiller.get(self.current_part - 1) {
if keep_track {
*read_size += df.estimated_size();
}
read.push(df);
}
self.current_part += 1;
}
}
}

impl Source for SortSource {
Expand All @@ -128,35 +146,38 @@ impl Source for SortSource {
self.print_verbose(context.verbose);
return Ok(SourceResult::Finished);
}

let check_in_mem = if let Some((part, _)) = self.files.peek() {
*part as usize != self.current_part
} else {
true
};

if check_in_mem {
let df = self.partition_spiller.get(self.current_part).unwrap();
self.current_part += 1;
return self.finish_from_df(df);
}
self.current_part += 1;
let mut read_size = 0;
let mut read = vec![];

match self.files.next() {
None => {
self.print_verbose(context.verbose);
Ok(SourceResult::Finished)
// Ensure we fetch all from memory.
self.get_from_memory(
&mut read,
&mut read_size,
self.partition_spiller.len(),
false,
);
if read.is_empty() {
self.print_verbose(context.verbose);
Ok(SourceResult::Finished)
} else {
self.finished = true;
let df = accumulate_dataframes_vertical_unchecked(read);
self.finish_from_df(df)
}
},
Some((partition, mut path)) => {
Some((mut partition, mut path)) => {
self.get_from_memory(&mut read, &mut read_size, partition as usize, true);
let limit = self.memtrack.get_available() / 3;

let mut read_size = 0;
let mut read = vec![];

if let Some(in_mem) = self.partition_spiller.get(partition as usize) {
read_size += in_mem.estimated_size();
read.push(in_mem)
}
loop {
if let Some(in_mem) = self.partition_spiller.get(partition as usize) {
read_size += in_mem.estimated_size();
read.push(in_mem)
}

let files = std::fs::read_dir(&path)?.collect::<std::io::Result<Vec<_>>>()?;

// read the files in a single partition in parallel
Expand All @@ -177,15 +198,15 @@ impl Source for SortSource {
break;
}

let Some((_, next_path)) = self.files.next() else {
let Some((next_part, next_path)) = self.files.next() else {
break;
};
path = next_path;
partition = next_part;
}
let df = accumulate_dataframes_vertical_unchecked(read);
let out = self.finish_from_df(df);
self.io_thread.clean(path);
self.current_part += 1;
out
},
}
Expand Down

0 comments on commit bc9f37d

Please sign in to comment.