-
Notifications
You must be signed in to change notification settings - Fork 654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#4419: Extend virtual partitioning API to pandas on Dask #4420
Changes from 29 commits
28cdc19
15b0072
ae4eb6c
026bd14
fa377a8
b9d68f7
17a9c65
348272d
6784b4b
4efd83b
af4edbc
0ac6de3
8c44eb1
a5af1ff
971e00a
87cad7c
0de19cd
4f5dd50
f5d3eb1
5e748cd
0cc25c4
43296b2
66c4360
9f48f0d
8dab0d1
2d9f150
9a945e8
5b322ee
a761182
6a9855c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ | |
from modin.error_message import ErrorMessage | ||
from modin.core.storage_formats.pandas.utils import compute_chunksize | ||
from modin.core.dataframe.pandas.utils import concatenate | ||
from modin.config import NPartitions, ProgressBar, BenchmarkMode | ||
from modin.config import NPartitions, ProgressBar, BenchmarkMode, Engine, StorageFormat | ||
|
||
import os | ||
|
||
|
@@ -615,11 +615,15 @@ def concat(cls, axis, left_parts, right_parts): | |
to_concat = ( | ||
[left_parts] + right_parts if left_parts.size != 0 else right_parts | ||
) | ||
return ( | ||
result = ( | ||
np.concatenate(to_concat, axis=axis) if len(to_concat) else left_parts | ||
) | ||
else: | ||
return np.append(left_parts, right_parts, axis=axis) | ||
result = np.append(left_parts, right_parts, axis=axis) | ||
if axis == 0: | ||
return cls.rebalance_partitions(result) | ||
else: | ||
return result | ||
|
||
@classmethod | ||
def to_pandas(cls, partitions): | ||
|
@@ -1292,7 +1296,15 @@ def finalize(cls, partitions): | |
@classmethod | ||
def rebalance_partitions(cls, partitions): | ||
""" | ||
Return the provided array of partitions without rebalancing it. | ||
Rebalance a 2-d array of partitions if we are using ``PandasOnRay`` or ``PandasOnDask`` executions. | ||
|
||
For all other executions, the partitions are returned unchanged. | ||
|
||
Rebalance the partitions by building a new array | ||
of partitions out of the original ones so that: | ||
|
||
- If all partitions have a length, each new partition has roughly the same number of rows. | ||
- Otherwise, each new partition spans roughly the same number of old partitions. | ||
|
||
Parameters | ||
---------- | ||
|
@@ -1302,6 +1314,103 @@ def rebalance_partitions(cls, partitions): | |
Returns | ||
------- | ||
np.ndarray | ||
The same 2-d array. | ||
A NumPy array with the same; or new, rebalanced, partitions, depending on the execution | ||
engine and storage format. | ||
""" | ||
if Engine.get() in ["Ray", "Dask"] and StorageFormat.get() == "Pandas": | ||
# Rebalancing partitions is currently only implemented for PandasOnRay and PandasOnDask. | ||
# We rebalance when the ratio of the number of existing partitions to | ||
# the ideal number of partitions is larger than this threshold. The | ||
# threshold is a heuristic that may need to be tuned for performance. | ||
max_excess_of_num_partitions = 1.5 | ||
num_existing_partitions = partitions.shape[0] | ||
ideal_num_new_partitions = NPartitions.get() | ||
if ( | ||
num_existing_partitions | ||
<= ideal_num_new_partitions * max_excess_of_num_partitions | ||
): | ||
return partitions | ||
# If any partition has an unknown length, give each axis partition | ||
# roughly the same number of row partitions. We use `_length_cache` here | ||
# to avoid materializing any unmaterialized lengths. | ||
if any( | ||
partition._length_cache is None | ||
for row in partitions | ||
for partition in row | ||
): | ||
# We need each partition to go into an axis partition, but the | ||
# number of axis partitions may not evenly divide the number of | ||
# partitions. | ||
chunk_size = compute_chunksize( | ||
YarShev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
num_existing_partitions, ideal_num_new_partitions, min_block_size=1 | ||
) | ||
return np.array( | ||
[ | ||
cls.column_partitions( | ||
partitions[i : i + chunk_size], | ||
full_axis=False, | ||
) | ||
for i in range( | ||
0, | ||
num_existing_partitions, | ||
chunk_size, | ||
) | ||
] | ||
) | ||
|
||
# If we know the number of rows in every partition, then we should try | ||
# instead to give each new partition roughly the same number of rows. | ||
new_partitions = [] | ||
# `start` is the index of the first existing partition that we want to | ||
# put into the current new partition. | ||
start = 0 | ||
total_rows = sum(part.length() for part in partitions[:, 0]) | ||
ideal_partition_size = compute_chunksize( | ||
total_rows, ideal_num_new_partitions, min_block_size=1 | ||
) | ||
for _ in range(ideal_num_new_partitions): | ||
# We might pick up old partitions too quickly and exhaust all of them. | ||
if start >= len(partitions): | ||
break | ||
Comment on lines
+1373
to
+1374
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand when this would happen? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment above kind of explains, but basically, if we have very, very small partitions, we may want to coalesce more than we actually have - e.g. if we have rebalanced all but the last three partitions, and those three alone are not enough to make a new partition, we would run off the end of the list before we've satisfied the min constraint for the length of the new partition, and hit this case. |
||
# `stop` is the index of the last existing partition so far that we | ||
# want to put into the current new partition. | ||
stop = start | ||
partition_size = partitions[start][0].length() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We try to cache it, but it will block if it is not cached. If it is blocking, it submits a remote task that gets the length of the object, and only materializes the length in memory on the main node. |
||
# Add existing partitions into the current new partition until the | ||
# number of rows in the new partition hits `ideal_partition_size`. | ||
while stop < len(partitions) and partition_size < ideal_partition_size: | ||
stop += 1 | ||
if stop < len(partitions): | ||
partition_size += partitions[stop][0].length() | ||
# If the new partition is larger than we want, split the last | ||
# current partition that it contains into two partitions, where | ||
# the first partition has just enough rows to make the current | ||
# new partition have length `ideal_partition_size`, and the second | ||
# partition has the remainder. | ||
if partition_size > ideal_partition_size * max_excess_of_num_partitions: | ||
new_last_partition_size = ideal_partition_size - sum( | ||
row[0].length() for row in partitions[start:stop] | ||
) | ||
YarShev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
partitions = np.insert( | ||
partitions, | ||
stop + 1, | ||
[ | ||
obj.mask(slice(new_last_partition_size, None), slice(None)) | ||
for obj in partitions[stop] | ||
], | ||
0, | ||
) | ||
partitions[stop, :] = [ | ||
obj.mask(slice(None, new_last_partition_size), slice(None)) | ||
for obj in partitions[stop] | ||
] | ||
partition_size = ideal_partition_size | ||
new_partitions.append( | ||
cls.column_partitions( | ||
(partitions[start : stop + 1]), | ||
full_axis=partition_size == total_rows, | ||
) | ||
) | ||
start = stop + 1 | ||
return np.array(new_partitions) | ||
return partitions |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,6 +15,7 @@ | |||||
|
||||||
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe | ||||||
from ..partitioning.partition_manager import PandasOnDaskDataframePartitionManager | ||||||
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper | ||||||
|
||||||
|
||||||
class PandasOnDaskDataframe(PandasDataframe): | ||||||
|
@@ -41,22 +42,63 @@ class PandasOnDaskDataframe(PandasDataframe): | |||||
|
||||||
_partition_mgr_cls = PandasOnDaskDataframePartitionManager | ||||||
|
||||||
def _get_partition_size_along_axis(self, partition, axis=0): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per #4494, we are currently calculating partition shapes serially in other places than the one you changed here, e.g. here for Ray, in Ray virtual partitioning In my opinion, we should parallelize getting all the partition shapes correctly in a separate fix for #4494. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I'm wrong about this function blocking on inner partitions, because it doesn't call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes sense to include here, since the original code to get length + width in parallel breaks when applied to axis partitions so I'll need to fix that code anyways, and if I'm doing that, I may as well fix the code to just do it all in parallel right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the original code was broken, I think it's okay to include the partial fix for #4494 here. |
||||||
""" | ||||||
Compute the length along the specified axis of the specified partition. | ||||||
|
||||||
Parameters | ||||||
---------- | ||||||
partition : ``PandasOnDaskDataframeVirtualPartition`` or ``PandasOnDaskDataframePartition`` | ||||||
The partition whose size to compute. | ||||||
axis : int, default: 0 | ||||||
The axis along which to compute size. | ||||||
|
||||||
Returns | ||||||
------- | ||||||
list | ||||||
A list of lengths along the specified axis that sum to the overall length of the partition | ||||||
along the specified axis. | ||||||
|
||||||
Notes | ||||||
----- | ||||||
This utility function is used to ensure that computation occurs asynchronously across all partitions | ||||||
whether the partitions are virtual or physical partitions. | ||||||
""" | ||||||
if isinstance(partition, self._partition_mgr_cls._partition_class): | ||||||
return [ | ||||||
partition.apply( | ||||||
lambda df: len(df) if not axis else len(df.columns) | ||||||
)._data | ||||||
] | ||||||
elif partition.axis == axis: | ||||||
return [ | ||||||
ptn.apply(lambda df: len(df) if not axis else len(df.columns))._data | ||||||
for ptn in partition.list_of_partitions_to_combine | ||||||
] | ||||||
return [ | ||||||
partition.list_of_partitions_to_combine[0] | ||||||
.apply(lambda df: len(df) if not axis else (len(df.columns))) | ||||||
._data | ||||||
] | ||||||
|
||||||
@property | ||||||
def _row_lengths(self): | ||||||
""" | ||||||
Compute the row partitions lengths if they are not cached. | ||||||
Compute ther row partitions lengths if they are not cached. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not fixed |
||||||
|
||||||
Returns | ||||||
------- | ||||||
list | ||||||
A list of row partitions lengths. | ||||||
""" | ||||||
if self._row_lengths_cache is None: | ||||||
self._row_lengths_cache = ( | ||||||
self._partition_mgr_cls.get_objects_from_partitions( | ||||||
[obj.apply(lambda df: len(df)) for obj in self._partitions.T[0]] | ||||||
) | ||||||
row_lengths_list = DaskWrapper.materialize( | ||||||
[ | ||||||
YarShev marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self._get_partition_size_along_axis(obj, axis=0) | ||||||
for obj in self._partitions.T[0] | ||||||
] | ||||||
) | ||||||
self._row_lengths_cache = [sum(len_list) for len_list in row_lengths_list] | ||||||
return self._row_lengths_cache | ||||||
|
||||||
@property | ||||||
|
@@ -70,12 +112,13 @@ def _column_widths(self): | |||||
A list of column partitions widths. | ||||||
""" | ||||||
if self._column_widths_cache is None: | ||||||
self._column_widths_cache = ( | ||||||
self._partition_mgr_cls.get_objects_from_partitions( | ||||||
[ | ||||||
obj.apply(lambda df: len(df.columns)) | ||||||
for obj in self._partitions[0] | ||||||
] | ||||||
) | ||||||
col_widths_list = DaskWrapper.materialize( | ||||||
YarShev marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
[ | ||||||
self._get_partition_size_along_axis(obj, axis=1) | ||||||
for obj in self._partitions[0] | ||||||
] | ||||||
) | ||||||
self._column_widths_cache = [ | ||||||
sum(width_list) for width_list in col_widths_list | ||||||
] | ||||||
return self._column_widths_cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq:
ideal_num_new_partitions
doesn't account for the total number of vCPUs in a cluster right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can override
NPartitions.get()
to do so. It just asks Ray how many cores there are, and returns that, so as long as your cluster is fully initialized when modin is initialized, and you setIsRayCluster
correctly, it should.