Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into rehan/virtual_par…
Browse files Browse the repository at this point in the history
…titioning_dask

Signed-off-by: Rehan Durrani <[email protected]>
  • Loading branch information
RehanSD committed Jun 15, 2022
2 parents 17a9c65 + 4ec7f63 commit 348272d
Show file tree
Hide file tree
Showing 20 changed files with 152 additions and 83 deletions.
4 changes: 2 additions & 2 deletions asv_bench/benchmarks/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,11 +513,11 @@ def execute(
if ASV_USE_ENGINE == "ray":
from ray import wait

all(map(lambda partition: wait([partition.oid]), partitions))
all(map(lambda partition: wait([partition._data]), partitions))
elif ASV_USE_ENGINE == "dask":
from dask.distributed import wait

all(map(lambda partition: wait(partition.future), partitions))
all(map(lambda partition: wait(partition._data), partitions))
elif ASV_USE_ENGINE == "python":
pass

Expand Down
7 changes: 4 additions & 3 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ Key Features and Updates
------------------------

* Stability and Bugfixes
*
* FIX-#4543: Fix `read_csv` in case skiprows=<0, []> (#4544)
* Performance enhancements
*
* Benchmarking enhancements
*
* Refactor Codebase
*
* REFACTOR-#4530: Standardize access to physical data in partitions (#4563)
* Pandas API implementations and improvements
*
* OmniSci enhancements
Expand All @@ -32,4 +32,5 @@ Key Features and Updates

Contributors
------------
@mvashishtha
@mvashishtha
@prutskov
1 change: 1 addition & 0 deletions modin/core/dataframe/pandas/partitioning/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class PandasDataframePartition(ABC): # pragma: no cover

_length_cache = None
_width_cache = None
_data = None

def get(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PandasOnDaskDataframePartition(PandasDataframePartition):
Parameters
----------
future : distributed.Future
data : distributed.Future
A reference to pandas DataFrame that need to be wrapped with this class.
length : distributed.Future or int, optional
Length or reference to it of wrapped pandas DataFrame.
Expand All @@ -40,9 +40,9 @@ class PandasOnDaskDataframePartition(PandasDataframePartition):
Call queue that needs to be executed on wrapped pandas DataFrame.
"""

def __init__(self, future, length=None, width=None, ip=None, call_queue=None):
assert isinstance(future, Future)
self.future = future
def __init__(self, data, length=None, width=None, ip=None, call_queue=None):
assert isinstance(data, Future)
self._data = data
if call_queue is None:
call_queue = []
self.call_queue = call_queue
Expand All @@ -60,7 +60,7 @@ def get(self):
The object from the distributed memory.
"""
self.drain_call_queue()
return DaskWrapper.materialize(self.future)
return DaskWrapper.materialize(self._data)

def apply(self, func, *args, **kwargs):
"""
Expand All @@ -87,15 +87,19 @@ def apply(self, func, *args, **kwargs):
call_queue = self.call_queue + [[func, args, kwargs]]
if len(call_queue) > 1:
futures = DaskWrapper.deploy(
apply_list_of_funcs, call_queue, self.future, num_returns=2, pure=False
apply_list_of_funcs,
call_queue,
self._data,
num_returns=2,
pure=False,
)
else:
# We handle `len(call_queue) == 1` in a different way because
# this improves performance a bit.
func, args, kwargs = call_queue[0]
futures = DaskWrapper.deploy(
apply_func,
self.future,
self._data,
func,
*args,
num_returns=2,
Expand Down Expand Up @@ -127,7 +131,7 @@ def add_to_apply_calls(self, func, *args, **kwargs):
The keyword arguments are sent as a dictionary.
"""
return PandasOnDaskDataframePartition(
self.future, call_queue=self.call_queue + [[func, args, kwargs]]
self._data, call_queue=self.call_queue + [[func, args, kwargs]]
)

def drain_call_queue(self):
Expand All @@ -137,29 +141,33 @@ def drain_call_queue(self):
call_queue = self.call_queue
if len(call_queue) > 1:
futures = DaskWrapper.deploy(
apply_list_of_funcs, call_queue, self.future, num_returns=2, pure=False
apply_list_of_funcs,
call_queue,
self._data,
num_returns=2,
pure=False,
)
else:
# We handle `len(call_queue) == 1` in a different way because
# this improves performance a bit.
func, args, kwargs = call_queue[0]
futures = DaskWrapper.deploy(
apply_func,
self.future,
self._data,
func,
*args,
num_returns=2,
pure=False,
**kwargs,
)
self.future = futures[0]
self._data = futures[0]
self._ip_cache = futures[1]
self.call_queue = []

def wait(self):
"""Wait completing computations on the object wrapped by the partition."""
self.drain_call_queue()
wait(self.future)
wait(self._data)

def mask(self, row_labels, col_labels):
"""
Expand Down Expand Up @@ -198,7 +206,7 @@ def __copy__(self):
A copy of this partition.
"""
return PandasOnDaskDataframePartition(
self.future,
self._data,
length=self._length_cache,
width=self._width_cache,
ip=self._ip_cache,
Expand Down Expand Up @@ -249,7 +257,7 @@ def length(self):
The length of the object.
"""
if self._length_cache is None:
self._length_cache = self.apply(lambda df: len(df)).future
self._length_cache = self.apply(lambda df: len(df))._data
if isinstance(self._length_cache, Future):
self._length_cache = DaskWrapper.materialize(self._length_cache)
return self._length_cache
Expand All @@ -264,7 +272,7 @@ def width(self):
The width of the object.
"""
if self._width_cache is None:
self._width_cache = self.apply(lambda df: len(df.columns)).future
self._width_cache = self.apply(lambda df: len(df.columns))._data
if isinstance(self._width_cache, Future):
self._width_cache = DaskWrapper.materialize(self._width_cache)
return self._width_cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ def get_objects_from_partitions(cls, partitions):
list
The objects wrapped by `partitions`.
"""
return DaskWrapper.materialize([partition.future for partition in partitions])
return DaskWrapper.materialize([partition._data for partition in partitions])
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PandasOnPythonDataframePartition(PandasDataframePartition):
"""

def __init__(self, data, length=None, width=None, call_queue=None):
self.data = data
self._data = data
if call_queue is None:
call_queue = []
self.call_queue = call_queue
Expand All @@ -63,7 +63,7 @@ def get(self):
Since this object is a simple wrapper, just return the copy of data.
"""
self.drain_call_queue()
return self.data.copy()
return self._data.copy()

def apply(self, func, *args, **kwargs):
"""
Expand Down Expand Up @@ -108,9 +108,11 @@ def call_queue_closure(data, call_queue):
raise e
return result

self.data = call_queue_closure(self.data, self.call_queue)
self._data = call_queue_closure(self._data, self.call_queue)
self.call_queue = []
return PandasOnPythonDataframePartition(func(self.data.copy(), *args, **kwargs))
return PandasOnPythonDataframePartition(
func(self._data.copy(), *args, **kwargs)
)

def add_to_apply_calls(self, func, *args, **kwargs):
"""
Expand All @@ -131,7 +133,8 @@ def add_to_apply_calls(self, func, *args, **kwargs):
New ``PandasOnPythonDataframePartition`` object with extended call queue.
"""
return PandasOnPythonDataframePartition(
self.data.copy(), call_queue=self.call_queue + [(func, args, kwargs)]
self._data.copy(),
call_queue=self.call_queue + [(func, args, kwargs)],
)

def drain_call_queue(self):
Expand Down Expand Up @@ -197,7 +200,7 @@ def length(self):
The length of the object.
"""
if self._length_cache is None:
self._length_cache = self.apply(self._length_extraction_fn()).data
self._length_cache = self.apply(self._length_extraction_fn())._data
return self._length_cache

def width(self):
Expand All @@ -210,5 +213,5 @@ def width(self):
The width of the object.
"""
if self._width_cache is None:
self._width_cache = self.apply(self._width_extraction_fn()).data
self._width_cache = self.apply(self._width_extraction_fn())._data
return self._width_cache
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, list_of_blocks, full_axis: bool = True):
for obj in list_of_blocks:
obj.drain_call_queue()
# Unwrap from PandasDataframePartition object for ease of use
self.list_of_blocks = [obj.data for obj in list_of_blocks]
self.list_of_blocks = [obj._data for obj in list_of_blocks]

partition_type = PandasOnPythonDataframePartition
instance_type = pandas.DataFrame
Expand Down
2 changes: 1 addition & 1 deletion modin/core/execution/ray/generic/modin_aqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def call_progress_bar(result_parts, line_no):
except AttributeError:
return
pbar_id = str(cell_no) + "-" + str(line_no)
futures = [x.oid for row in result_parts for x in row]
futures = [x._data for row in result_parts for x in row]
bar_format = (
"{l_bar}{bar}{r_bar}"
if "DEBUG_PROGRESS_BAR" in os.environ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def to_numpy(cls, partitions, **kwargs):
"""
parts = ray.get(
[
obj.apply(lambda df, **kwargs: df.to_numpy(**kwargs)).oid
obj.apply(lambda df, **kwargs: df.to_numpy(**kwargs))._data
for row in partitions
for obj in row
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def func(df, **kw):
max_retries=0,
)
# pending completion
ray.get([partition.oid for partition in result.flatten()])
ray.get([partition._data for partition in result.flatten()])

@staticmethod
def _to_parquet_check_support(kwargs):
Expand Down Expand Up @@ -308,4 +308,4 @@ def func(df, **kw):
lengths=None,
enumerate_partitions=True,
)
ray.get([part.oid for row in result for part in row])
ray.get([part._data for row in result for part in row])
Loading

0 comments on commit 348272d

Please sign in to comment.