Skip to content

Commit

Permalink
REFACTOR-#3780: Remove code duplication for PandasOnDaskDataframe (#…
Browse files Browse the repository at this point in the history
…3781)

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored Sep 1, 2022
1 parent 5086a9e commit 7319890
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 84 deletions.
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Key Features and Updates
* REFACTOR-#4832: unify `split_result_of_axis_func_pandas` (#4831)
* REFACTOR-#4796: Introduce constant for __reduced__ column name (#4799)
* REFACTOR-#4000: Remove code duplication for `PandasOnRayDataframePartitionManager` (#4895)
* REFACTOR-#3780: Remove code duplication for `PandasOnDaskDataframe` (#3781)
* REFACTOR-#4530: Unify access to physical data for any partition type (#4829)
* Pandas API implementations and improvements
* FEAT-#4670: Implement convert_dtypes by mapping across partitions (#4671)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from ..partitioning.partition_manager import PandasOnDaskDataframePartitionManager
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper


class PandasOnDaskDataframe(PandasDataframe):
Expand All @@ -41,86 +40,3 @@ class PandasOnDaskDataframe(PandasDataframe):
"""

_partition_mgr_cls = PandasOnDaskDataframePartitionManager

def _get_partition_size_along_axis(self, partition, axis=0):
"""
Compute the length along the specified axis of the specified partition.
Parameters
----------
partition : ``PandasOnDaskDataframeVirtualPartition`` or ``PandasOnDaskDataframePartition``
The partition whose size to compute.
axis : int, default: 0
The axis along which to compute size.
Returns
-------
list
A list of lengths along the specified axis that sum to the overall length of the partition
along the specified axis.
Notes
-----
This utility function is used to ensure that computation occurs asynchronously across all partitions
whether the partitions are virtual or physical partitions.
"""
if isinstance(partition, self._partition_mgr_cls._partition_class):
return [
partition.apply(
lambda df: len(df) if not axis else len(df.columns)
).list_of_blocks[0]
]
elif partition.axis == axis:
return [
ptn.apply(
lambda df: len(df) if not axis else len(df.columns)
).list_of_blocks[0]
for ptn in partition.list_of_block_partitions
]
return [
partition.list_of_block_partitions[0]
.apply(lambda df: len(df) if not axis else (len(df.columns)))
.list_of_blocks[0]
]

@property
def _row_lengths(self):
"""
Compute ther row partitions lengths if they are not cached.
Returns
-------
list
A list of row partitions lengths.
"""
if self._row_lengths_cache is None:
row_lengths_list = DaskWrapper.materialize(
[
self._get_partition_size_along_axis(obj, axis=0)
for obj in self._partitions.T[0]
]
)
self._row_lengths_cache = [sum(len_list) for len_list in row_lengths_list]
return self._row_lengths_cache

@property
def _column_widths(self):
"""
Compute the column partitions widths if they are not cached.
Returns
-------
list
A list of column partitions widths.
"""
if self._column_widths_cache is None:
col_widths_list = DaskWrapper.materialize(
[
self._get_partition_size_along_axis(obj, axis=1)
for obj in self._partitions[0]
]
)
self._column_widths_cache = [
sum(width_list) for width_list in col_widths_list
]
return self._column_widths_cache

0 comments on commit 7319890

Please sign in to comment.