Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support intersect all and except distinct/all in DataFrame API #3537

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,7 @@ class LogicalPlanBuilder:
) -> LogicalPlanBuilder: ...
def concat(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder: ...
def intersect(self, other: LogicalPlanBuilder, is_all: bool) -> LogicalPlanBuilder: ...
def except_(self, other: LogicalPlanBuilder, is_all: bool) -> LogicalPlanBuilder: ...
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there's a better name for this, it would be really appreciated.

def add_monotonically_increasing_id(self, column_name: str | None) -> LogicalPlanBuilder: ...
def table_write(
self,
Expand Down
88 changes: 88 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2542,6 +2542,94 @@ def intersect(self, other: "DataFrame") -> "DataFrame":
builder = self._builder.intersect(other._builder)
return DataFrame(builder)

@DataframePublicAPI
def intersect_all(self, other: "DataFrame") -> "DataFrame":
"""Returns the intersection of two DataFrames, including duplicates.

Example:
>>> import daft
>>> df1 = daft.from_pydict({"a": [1, 2, 2], "b": [4, 6, 6]})
>>> df2 = daft.from_pydict({"a": [1, 1, 2, 2], "b": [4, 4, 6, 6]})
>>> df1.intersect_all(df2).sort("a").collect()
╭───────┬───────╮
│ a ┆ b │
│ --- ┆ --- │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1 ┆ 4 │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2 ┆ 6 │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2 ┆ 6 │
╰───────┴───────╯
<BLANKLINE>
(Showing first 3 of 3 rows)

Args:
other (DataFrame): DataFrame to intersect with

Returns:
DataFrame: DataFrame with the intersection of the two DataFrames, including duplicates
"""
builder = self._builder.intersect_all(other._builder)
return DataFrame(builder)

@DataframePublicAPI
def except_distinct(self, other: "DataFrame") -> "DataFrame":
"""Returns the set difference of two DataFrames.

Example:
>>> import daft
>>> df1 = daft.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> df2 = daft.from_pydict({"a": [1, 2, 3], "b": [4, 8, 6]})
>>> df1.except_distinct(df2).collect()
╭───────┬───────╮
│ a ┆ b │
│ --- ┆ --- │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 2 ┆ 5 │
╰───────┴───────╯
<BLANKLINE>
(Showing first 1 of 1 rows)

Args:
other (DataFrame): DataFrame to except with

Returns:
DataFrame: DataFrame with the set difference of the two DataFrames
"""
builder = self._builder.except_distinct(other._builder)
return DataFrame(builder)

@DataframePublicAPI
def except_all(self, other: "DataFrame") -> "DataFrame":
"""Returns the set difference of two DataFrames, considering duplicates.

Example:
>>> import daft
>>> df1 = daft.from_pydict({"a": [1, 1, 2, 2], "b": [4, 4, 6, 6]})
>>> df2 = daft.from_pydict({"a": [1, 2, 2], "b": [4, 6, 6]})
>>> df1.except_all(df2).collect()
╭───────┬───────╮
│ a ┆ b │
│ --- ┆ --- │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1 ┆ 4 │
╰───────┴───────╯
<BLANKLINE>
(Showing first 1 of 1 rows)

Args:
other (DataFrame): DataFrame to except with

Returns:
DataFrame: DataFrame with the set difference of the two DataFrames, considering duplicates
"""
builder = self._builder.except_all(other._builder)
return DataFrame(builder)

def _materialize_results(self) -> None:
"""Materializes the results of for this DataFrame and hold a pointer to the results."""
context = get_context()
Expand Down
12 changes: 12 additions & 0 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ def intersect(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder:
builder = self._builder.intersect(other._builder, False)
return LogicalPlanBuilder(builder)

def intersect_all(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder:
builder = self._builder.intersect(other._builder, True)
return LogicalPlanBuilder(builder)

def except_distinct(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder:
builder = self._builder.except_(other._builder, False)
return LogicalPlanBuilder(builder)

def except_all(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder:
builder = self._builder.except_(other._builder, True)
return LogicalPlanBuilder(builder)

def add_monotonically_increasing_id(self, column_name: str | None) -> LogicalPlanBuilder:
builder = self._builder.add_monotonically_increasing_id(column_name)
return LogicalPlanBuilder(builder)
Expand Down
46 changes: 45 additions & 1 deletion src/daft-core/src/array/ops/list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{iter::repeat, sync::Arc};

use arrow2::offset::OffsetsBuffer;
use arrow2::offset::{Offsets, OffsetsBuffer};
use common_error::DaftResult;
use indexmap::{
map::{raw_entry_v1::RawEntryMut, RawEntryApiV1},
Expand Down Expand Up @@ -255,6 +255,31 @@ fn list_sort_helper_fixed_size(
.collect()
}

fn general_list_fill_helper(element: &Series, num_array: &Int64Array) -> DaftResult<Vec<Series>> {
let num_iter = create_iter(num_array, element.len());
let mut result = Vec::with_capacity(element.len());
let element_data = element.as_physical()?;
for (row_index, num) in num_iter.enumerate() {
let list_arr = if element.is_valid(row_index) {
let mut list_growable = make_growable(
element.name(),
element.data_type(),
vec![&element_data],
false,
num as usize,
);
for _ in 0..num {
list_growable.extend(0, row_index, 1);
}
list_growable.build()?
} else {
Series::full_null(element.name(), element.data_type(), num as usize)
};
result.push(list_arr);
}
Ok(result)
}

impl ListArray {
pub fn value_counts(&self) -> DaftResult<MapArray> {
struct IndexRef {
Expand Down Expand Up @@ -625,6 +650,25 @@ impl ListArray {
self.validity().cloned(),
))
}

pub fn list_fill(elem: &Series, num_array: &Int64Array) -> DaftResult<Self> {
let generated = general_list_fill_helper(elem, num_array)?;
let generated_refs: Vec<&Series> = generated.iter().collect();
let lengths = generated.iter().map(|arr| arr.len());
let offsets = Offsets::try_from_lengths(lengths)?;
let flat_child = if generated_refs.is_empty() {
// when there's no output, we should create an empty series
Series::empty(elem.name(), elem.data_type())
} else {
Series::concat(&generated_refs)?
};
Ok(Self::new(
elem.field().to_list_field()?,
flat_child,
offsets.into(),
None,
))
}
}

impl FixedSizeListArray {
Expand Down
13 changes: 12 additions & 1 deletion src/daft-core/src/series/ops/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use common_error::{DaftError, DaftResult};
use daft_schema::field::Field;

use crate::{
array::ListArray,
datatypes::{DataType, UInt64Array, Utf8Array},
prelude::CountMode,
prelude::{CountMode, Int64Array},
series::{IntoSeries, Series},
};

Expand Down Expand Up @@ -217,4 +218,14 @@ impl Series {
))),
}
}

/// Given a series of data T, repeat each data T with num times to create a list, returns
/// a series of repeated list.
/// # Example
/// ```txt
/// repeat([1, 2, 3], [2, 0, 1]) --> [[1, 1], [], [3]]
/// ```
pub fn list_fill(&self, num: &Int64Array) -> DaftResult<Self> {
ListArray::list_fill(self, num).map(|arr| arr.into_series())
}
}
Loading
Loading