Skip to content

Commit

Permalink
feat: Add maintain_order parameter to joins (#20026)
Browse files Browse the repository at this point in the history
  • Loading branch information
stijnherfst authored Dec 6, 2024
1 parent abbad69 commit e9ddd37
Show file tree
Hide file tree
Showing 17 changed files with 489 additions and 95 deletions.
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/ops/sort/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl SortMultipleOptions {
self
}

/// Implement order for all columns. Default `false`.
/// Sort order for all columns. Default `false` which is ascending.
pub fn with_order_descending(mut self, descending: bool) -> Self {
self.descending = vec![descending];
self
Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2184,6 +2184,7 @@ impl DataFrame {

/// Return a sorted clone of this [`DataFrame`].
///
/// In many cases the output chunks will be continuous in memory but this is not guaranteed
/// # Example
///
/// Sort by a single column with default options:
Expand Down
16 changes: 13 additions & 3 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use polars_core::prelude::*;
use polars_expr::{create_physical_expr, ExpressionConversionState};
use polars_io::RowIndex;
use polars_mem_engine::{create_physical_plan, Executor};
use polars_ops::frame::JoinCoalesce;
use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
#[cfg(feature = "is_between")]
use polars_ops::prelude::ClosedInterval;
pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
Expand Down Expand Up @@ -1998,8 +1998,9 @@ pub struct JoinBuilder {
force_parallel: bool,
suffix: Option<PlSmallStr>,
validation: JoinValidation,
coalesce: JoinCoalesce,
join_nulls: bool,
coalesce: JoinCoalesce,
maintain_order: MaintainOrderJoin,
}
impl JoinBuilder {
/// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
Expand All @@ -2012,10 +2013,11 @@ impl JoinBuilder {
right_on: vec![],
allow_parallel: true,
force_parallel: false,
join_nulls: false,
suffix: None,
validation: Default::default(),
join_nulls: false,
coalesce: Default::default(),
maintain_order: Default::default(),
}
}

Expand Down Expand Up @@ -2096,6 +2098,12 @@ impl JoinBuilder {
self
}

/// Whether to preserve the row order.
pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
self.maintain_order = maintain_order;
self
}

/// Finish builder
pub fn finish(self) -> LazyFrame {
let mut opt_state = self.lf.opt_state;
Expand All @@ -2113,6 +2121,7 @@ impl JoinBuilder {
slice: None,
join_nulls: self.join_nulls,
coalesce: self.coalesce,
maintain_order: self.maintain_order,
};

let lp = self
Expand Down Expand Up @@ -2209,6 +2218,7 @@ impl JoinBuilder {
slice: None,
join_nulls: self.join_nulls,
coalesce: self.coalesce,
maintain_order: self.maintain_order,
};
let options = JoinOptions {
allow_parallel: self.allow_parallel,
Expand Down
15 changes: 15 additions & 0 deletions crates/polars-ops/src/frame/join/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct JoinArgs {
pub slice: Option<(i64, usize)>,
pub join_nulls: bool,
pub coalesce: JoinCoalesce,
pub maintain_order: MaintainOrderJoin,
}

impl JoinArgs {
Expand Down Expand Up @@ -68,6 +69,18 @@ impl JoinCoalesce {
}
}

#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, Default, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[strum(serialize_all = "snake_case")]
pub enum MaintainOrderJoin {
#[default]
None,
Left,
Right,
LeftRight,
RightLeft,
}

impl Default for JoinArgs {
fn default() -> Self {
Self {
Expand All @@ -77,6 +90,7 @@ impl Default for JoinArgs {
slice: None,
join_nulls: false,
coalesce: Default::default(),
maintain_order: Default::default(),
}
}
}
Expand All @@ -90,6 +104,7 @@ impl JoinArgs {
slice: None,
join_nulls: false,
coalesce: Default::default(),
maintain_order: Default::default(),
}
}

Expand Down
222 changes: 165 additions & 57 deletions crates/polars-ops/src/frame/join/dispatch_left_right.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,82 +67,190 @@ pub fn materialize_left_join_from_series(
s_right = s_right.rechunk();
}

let ids = sort_or_hash_left(&s_left, &s_right, verbose, args.validation, args.join_nulls)?;
// The current sort_or_hash_left implementation preserves the Left DataFrame order so skip left for now.
let requires_ordering = matches!(
args.maintain_order,
MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft
);
if requires_ordering {
// When ordering we rechunk the series so we don't get ChunkIds as output
s_left = s_left.rechunk();
s_right = s_right.rechunk();
}

let (left_idx, right_idx) =
sort_or_hash_left(&s_left, &s_right, verbose, args.validation, args.join_nulls)?;

let right = if let Some(drop_names) = drop_names {
right.drop_many(drop_names)
} else {
right.drop(s_right.name()).unwrap()
};
Ok(materialize_left_join(&left, &right, ids, args))

#[cfg(feature = "chunked_ids")]
match (left_idx, right_idx) {
(ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Left(right_idx)) => {
if requires_ordering {
Ok(maintain_order_idx(
&left,
&right,
left_idx.as_slice(),
right_idx.as_slice(),
args,
))
} else {
Ok(POOL.join(
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
))
}
},
(ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
)),
(ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
|| materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
)),
(ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Left(right_idx)) => Ok(POOL.join(
|| materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
)),
}

#[cfg(not(feature = "chunked_ids"))]
if requires_ordering {
Ok(maintain_order_idx(
&left,
&right,
left_idx.as_slice(),
right_idx.as_slice(),
args,
))
} else {
Ok(POOL.join(
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
))
}
}

#[cfg(feature = "chunked_ids")]
fn materialize_left_join(
fn maintain_order_idx(
left: &DataFrame,
other: &DataFrame,
ids: LeftJoinIds,
left_idx: &[IdxSize],
right_idx: &[NullableIdxSize],
args: &JoinArgs,
) -> (DataFrame, DataFrame) {
let (left_idx, right_idx) = ids;
let materialize_left = || match left_idx {
ChunkJoinIds::Left(left_idx) => unsafe {
let mut left_idx = &*left_idx;
if let Some((offset, len)) = args.slice {
left_idx = slice_slice(left_idx, offset, len);
}
left._create_left_df_from_slice(left_idx, true, args.slice.is_some(), true)
},
ChunkJoinIds::Right(left_idx) => unsafe {
let mut left_idx = &*left_idx;
if let Some((offset, len)) = args.slice {
left_idx = slice_slice(left_idx, offset, len);
}
left.create_left_df_chunked(left_idx, true, args.slice.is_some())
},
let mut df = {
// SAFETY: left_idx and right_idx are continuous memory that outlive the memory mapped slices
let left = unsafe { IdxCa::mmap_slice("a".into(), left_idx) };
let right = unsafe { IdxCa::mmap_slice("b".into(), bytemuck::cast_slice(right_idx)) };
DataFrame::new(vec![left.into_series().into(), right.into_series().into()]).unwrap()
};

let materialize_right = || match right_idx {
ChunkJoinOptIds::Left(right_idx) => unsafe {
let mut right_idx = &*right_idx;
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx))
},
ChunkJoinOptIds::Right(right_idx) => unsafe {
let mut right_idx = &*right_idx;
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
other._take_opt_chunked_unchecked_hor_par(right_idx)
},
let options = SortMultipleOptions::new()
.with_order_descending(false)
.with_maintain_order(true);

let columns = match args.maintain_order {
// If the left order is preserved then there are no unsorted right rows
// So Left and LeftRight are equal
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
MaintainOrderJoin::Right => vec!["b"],
MaintainOrderJoin::RightLeft => vec!["b", "a"],
_ => unreachable!(),
};
POOL.join(materialize_left, materialize_right)

df.sort_in_place(columns, options).unwrap();
df.rechunk_mut();

let join_tuples_left = df
.column("a")
.unwrap()
.as_series()
.unwrap()
.idx()
.unwrap()
.cont_slice()
.unwrap();

let join_tuples_right = df
.column("b")
.unwrap()
.as_series()
.unwrap()
.idx()
.unwrap()
.cont_slice()
.unwrap();

POOL.join(
|| materialize_left_join_idx_left(left, join_tuples_left, args),
|| materialize_left_join_idx_right(other, bytemuck::cast_slice(join_tuples_right), args),
)
}

#[cfg(not(feature = "chunked_ids"))]
fn materialize_left_join(
fn materialize_left_join_idx_left(
left: &DataFrame,
other: &DataFrame,
ids: LeftJoinIds,
left_idx: &[IdxSize],
args: &JoinArgs,
) -> (DataFrame, DataFrame) {
let (left_idx, right_idx) = ids;

let mut left_idx = &*left_idx;
if let Some((offset, len)) = args.slice {
left_idx = slice_slice(left_idx, offset, len);
) -> DataFrame {
let left_idx = if let Some((offset, len)) = args.slice {
slice_slice(left_idx, offset, len)
} else {
left_idx
};
unsafe {
left._create_left_df_from_slice(
left_idx,
true,
args.slice.is_some(),
matches!(
args.maintain_order,
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
),
)
}
let materialize_left =
|| unsafe { left._create_left_df_from_slice(&left_idx, true, args.slice.is_some(), true) };
}

let mut right_idx = &*right_idx;
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
let materialize_right = || {
let right_idx = &*right_idx;
unsafe { IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx)) }
fn materialize_left_join_idx_right(
right: &DataFrame,
right_idx: &[NullableIdxSize],
args: &JoinArgs,
) -> DataFrame {
let right_idx = if let Some((offset, len)) = args.slice {
slice_slice(right_idx, offset, len)
} else {
right_idx
};
unsafe { IdxCa::with_nullable_idx(right_idx, |idx| right.take_unchecked(idx)) }
}
#[cfg(feature = "chunked_ids")]
fn materialize_left_join_chunked_left(
left: &DataFrame,
left_idx: &[ChunkId],
args: &JoinArgs,
) -> DataFrame {
let left_idx = if let Some((offset, len)) = args.slice {
slice_slice(left_idx, offset, len)
} else {
left_idx
};
unsafe { left.create_left_df_chunked(left_idx, true, args.slice.is_some()) }
}

#[cfg(feature = "chunked_ids")]
fn materialize_left_join_chunked_right(
right: &DataFrame,
right_idx: &[ChunkId],
args: &JoinArgs,
) -> DataFrame {
let right_idx = if let Some((offset, len)) = args.slice {
slice_slice(right_idx, offset, len)
} else {
right_idx
};
POOL.join(materialize_left, materialize_right)
unsafe { right._take_opt_chunked_unchecked_hor_par(right_idx) }
}
3 changes: 2 additions & 1 deletion crates/polars-ops/src/frame/join/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ fn get_suffix(suffix: Option<PlSmallStr>) -> PlSmallStr {
suffix.unwrap_or_else(|| PlSmallStr::from_static("_right"))
}

/// Utility method to finish a join.
/// Renames the columns on the right to not clash with the left using a specified or otherwise default suffix
/// and then merges the right dataframe into the left
#[doc(hidden)]
pub fn _finish_join(
mut df_left: DataFrame,
Expand Down
Loading

0 comments on commit e9ddd37

Please sign in to comment.