Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Re-implement APIs like select_columns with PyArrow batch format #48140

Open
wants to merge 33 commits into
base: master
Choose a base branch
from

Conversation

ArturNiederfahrenhorst
Copy link
Contributor

@ArturNiederfahrenhorst ArturNiederfahrenhorst commented Oct 21, 2024

Related issue number

Closes #48090

Prerequisite: #48575

@ArturNiederfahrenhorst
Copy link
Contributor Author

Looking at the failed test...

python/ray/data/dataset.py Outdated Show resolved Hide resolved
@ArturNiederfahrenhorst
Copy link
Contributor Author

I'll rebase once the fix is in and mongodb test should pass

Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArturNiederfahrenhorst please hold on landing this one

python/ray/data/dataset.py Outdated Show resolved Hide resolved
Comment on lines 701 to 702
Callable[["pandas.DataFrame"], "pandas.Series"],
Callable[["pyarrow.Table"], "pyarrow.Array"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These API has to be consistent with map_batches for both inputs and outputs

Copy link
Member

@bveeramani bveeramani Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't consistent to begin with.

fn: Callable[["pandas.DataFrame"], "pandas.Series"],
*,

Without making breaking changes, what should the type of fn be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. That could not be an excuse to not make it right, though.

Should be matching the map_batches (REF):

DataBatch = Union["pyarrow.Table", "pandas.DataFrame", Dict[str, np.ndarray]]

(ie also permit ndarray to be accepted/returned)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the return type of the callable be? Currently it's pandas.Series. Changing the return type from Series to DataFrame would be a breaking change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map batches accepts "table"-like structure, here we expect list of column values so it's not to replace with DataFrame literally, but instead align the APIs

  • map_batches accepts: DataBatch = Union["pyarrow.Table", "pandas.DataFrame", Dict[str, np.ndarray]]
  • add_column should accept Union[pa.Array, pandas.Seriers, ndarray]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin Do you want us to add the numpy functionality in this PR as well for consistency with map_batches?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline with @alexeykudinkin -- let's do Callable[[DataBatch], Union[pa.Array, pd.Series, ndarray]].

That would not be correct because that term would allow for example Callable[[pyarrow.Table], ndarray]] which I don't think we want to allow?

It's weird, but map_batches allows you to change the batch format. Something like this is valid:

def udf(batch: pa.Table) -> Dict[str, np.ndarray]
    ...

ds.map_batches(udf, batch_format="pyarrow")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani this happens b/c Arrow is able to do zero-copy from ndarray (with some exceptions)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant more from like an interface perspective. At least personally, I found it unexpected that I could do an Arrow table as input and a DataFrame as output (not that it's necessarily an issue)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the input guys. I've made the change. Waiting for CI...

)

assert ds.count() == 5
assert ds.schema().names == ["_id", "float_field", "int_field"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made these changes to decouple them from the string representation which may vary over versions. On my local environment, it was different then here/CI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@@ -362,7 +383,7 @@ def test_drop_columns(ray_start_regular_shared, tmp_path):
assert ds.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}]
assert ds.drop_columns(["col1", "col3"]).take(1) == [{"col2": 2}]
assert ds.drop_columns([]).take(1) == [{"col1": 1, "col2": 2, "col3": 3}]
assert ds.drop_columns(["col1", "col2", "col3"]).take(1) == [{}]
assert ds.drop_columns(["col1", "col2", "col3"]).take(1) == []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, this behavior is arbitrary and probably has little practical relevance.
Since our pyarrow implementation of the drop operation returns an empty list, we decided to just change the test in this case.

def add_column(batch: "pandas.DataFrame") -> "pandas.DataFrame":
batch.loc[:, col] = fn(batch)
return batch
def add_column(batch: "pyarrow.Table") -> "pyarrow.Table":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the typing here is off - batch is DataBatch type right? for example if it is pandas

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment on lines 781 to 789
if batch_format not in [
"pandas",
"pyarrow",
]:
raise ValueError(
f"batch_format argument must be 'pandas' or 'pyarrow', "
f"got: {batch_format}"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to validate here, should happen in map_batches

Comment on lines +843 to +846
# Historically, we have also accepted lists with duplicate column names.
# This is not tolerated by the underlying pyarrow.Table.drop_columns method.
cols_without_duplicates = list(set(cols))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should just enforce this via validation / raise an error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change then!
Still?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, yes.

python/ray/data/dataset.py Outdated Show resolved Hide resolved
Comment on lines 781 to 788
if batch_format not in [
"pandas",
"pyarrow",
]:
raise ValueError(
f"batch_format argument must be 'pandas' or 'pyarrow', "
f"got: {batch_format}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we can't support the numpy batch format?

Comment on lines +775 to +776
# Create a new table with the updated column
return batch.set_column(column_idx, col, column)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we either error or emit a warning here? Overriding a column might be unexpected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani Does Ray Data have existing helpers to log this without spamming?
I'd do the same for numpy, pandas and arrow then.

)

assert ds.count() == 5
assert ds.schema().names == ["_id", "float_field", "int_field"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Co-authored-by: Balaji Veeramani <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
@richardliaw richardliaw added the go add ONLY when ready to merge, run all tests label Nov 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Data] Re-implement APIs like select_columns with PyArrow batch format
4 participants