Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed May 10, 2024
1 parent c597f7f commit 9562144
Showing 1 changed file with 39 additions and 6 deletions.
45 changes: 39 additions & 6 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class PandasDataframe(
_index_cache: ModinIndex = None
_columns_cache: ModinIndex = None
_dtypes: Optional[ModinDtypes] = None
_pandas_backend: str = None
_pandas_backend: Optional[str] = None

@cached_property
def __constructor__(self) -> type[PandasDataframe]:
Expand Down Expand Up @@ -1321,6 +1321,7 @@ def _take_2d_positional(
new_row_lengths,
new_col_widths,
new_dtypes,
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -1460,7 +1461,9 @@ def from_labels(self) -> PandasDataframe:
new_column_names = pandas.Index(level_names, tupleize_cols=False)
new_columns = new_column_names.append(self.columns)

def from_labels_executor(df, **kwargs):
def from_labels_executor(
df: pandas.DataFrame, **kwargs
) -> pandas.DataFrame: # pragma: no cover
# Setting the names here ensures that external and internal metadata always match.
df.index.names = new_column_names

Expand Down Expand Up @@ -1496,6 +1499,7 @@ def from_labels_executor(df, **kwargs):
row_lengths=self._row_lengths_cache,
column_widths=new_column_widths,
dtypes=new_dtypes,
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)
# Set flag for propagating deferred row labels across dataframe partitions
Expand Down Expand Up @@ -1629,6 +1633,7 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
new_lengths,
new_widths,
new_dtypes,
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)

Expand All @@ -1649,6 +1654,7 @@ def copy(self):
self._row_lengths_cache,
self._column_widths_cache,
self.copy_dtypes_cache(),
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -1754,6 +1760,7 @@ def astype_builder(df):
self._row_lengths_cache,
self._column_widths_cache,
new_dtypes,
# TODO: backend can be changed
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -2115,6 +2122,7 @@ def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None):
*new_axes,
*new_axes_lengths,
dtypes,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)
return result
Expand Down Expand Up @@ -2301,6 +2309,7 @@ def map(
self._row_lengths_cache,
self._column_widths_cache,
dtypes=dtypes,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -2381,6 +2390,7 @@ def fold(self, axis, func, new_columns=None):
self.copy_columns_cache(copy_lengths=True),
self._row_lengths_cache,
self._column_widths_cache,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -2428,6 +2438,7 @@ def infer_types(self, col_labels: List[str]) -> PandasDataframe:
self._row_lengths_cache,
self._column_widths_cache,
new_dtypes,
# CHECKED: backend may be changed depending on `new_cols_dtypes`
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -2534,6 +2545,7 @@ def combine_and_apply(
self._row_lengths_cache,
[len(self.columns)] if self.has_materialized_columns else None,
self.copy_dtypes_cache(),
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)
else:
Expand Down Expand Up @@ -2838,6 +2850,7 @@ def filter(self, axis: Union[Axis, int], condition: Callable) -> PandasDataframe
*new_axes,
*new_lengths,
self.copy_dtypes_cache() if axis == Axis.COL_WISE else None,
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -2897,6 +2910,7 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> PandasDataframe:
new_columns,
row_lengths,
column_widths,
# TODO: need check
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -2925,6 +2939,7 @@ def combine(self) -> PandasDataframe:
else None
),
dtypes=self.copy_dtypes_cache(),
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)
result.synchronize_labels()
Expand Down Expand Up @@ -3081,6 +3096,7 @@ def apply_full_axis_select_indices(
None,
None,
dtypes=new_dtypes,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -3176,6 +3192,7 @@ def apply_select_indices(
lengths_objs[0],
lengths_objs[1],
new_dtypes,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)
else:
Expand Down Expand Up @@ -3204,6 +3221,7 @@ def apply_select_indices(
self._row_lengths_cache,
self._column_widths_cache,
new_dtypes,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -3310,6 +3328,7 @@ def _pick_axis(get_axis, sizes_cache):
new_row_lengths,
new_column_widths,
dtypes=dtypes,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -3375,14 +3394,14 @@ def broadcast_apply_select_indices(
self,
axis,
func,
other,
other: PandasDataframe,
apply_indices=None,
numeric_indices=None,
keep_remaining=False,
broadcast_all=True,
new_index=None,
new_columns=None,
):
) -> PandasDataframe:
"""
Apply a function to select indices at specified axis and broadcast partitions of `other` Modin DataFrame.
Expand Down Expand Up @@ -3452,6 +3471,7 @@ def broadcast_apply_select_indices(
new_partitions,
index=new_index,
columns=new_columns,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -3576,13 +3596,15 @@ def broadcast_apply_full_axis(
else:
if new_columns is None:
assert not is_list_like(dtypes)
# CHECKED: backend may be changed depending on function
dtype = self.construct_dtype(dtypes, self._pandas_backend)
kw["dtypes"] = ModinDtypes(DtypesDescriptor(remaining_dtype=dtype))
else:
kw["dtypes"] = (
pandas.Series(dtypes, index=new_columns)
if is_list_like(dtypes)
else pandas.Series(
# CHECKED: backend may be changed depending on function
[self.construct_dtype(dtypes, self._pandas_backend)]
* len(new_columns),
index=new_columns,
Expand Down Expand Up @@ -3652,6 +3674,7 @@ def broadcast_apply_full_axis(
index=new_index,
columns=new_columns,
**kw,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)
if sync_labels and new_index is not None:
Expand Down Expand Up @@ -3832,12 +3855,12 @@ def _copartition(
def n_ary_op(
self,
op,
right_frames: list,
right_frames: list[PandasDataframe],
join_type="outer",
copartition_along_columns=True,
labels="replace",
dtypes: Optional[pandas.Series] = None,
):
) -> PandasDataframe:
"""
Perform an n-opary operation by joining with other Modin DataFrame(s).
Expand Down Expand Up @@ -3874,6 +3897,7 @@ def n_ary_op(
self.copy_columns_cache(copy_lengths=True),
row_lengths,
self._column_widths_cache,
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)
new_right_frames = [
Expand All @@ -3883,6 +3907,7 @@ def n_ary_op(
right_frame.copy_columns_cache(copy_lengths=True),
row_lengths,
right_frame._column_widths_cache,
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)
for right_parts, right_frame in zip(list_of_right_parts, right_frames)
Expand Down Expand Up @@ -3921,6 +3946,7 @@ def n_ary_op(
row_lengths,
column_widths,
dtypes,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -4055,6 +4081,7 @@ def _compute_new_widths():
new_lengths,
new_widths,
new_dtypes,
# CHECKED: backend preserved
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -4130,6 +4157,7 @@ def _apply_func_to_range_partitioning_broadcast(
index=new_index,
columns=new_columns,
dtypes=new_dtypes,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -4479,6 +4507,7 @@ def join_cols(df, *cols):
new_partitions,
index=result.copy_index_cache(),
row_lengths=result._row_lengths_cache,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -4559,6 +4588,7 @@ def groupby_reduce(
new_partitions,
index=new_index,
columns=new_columns,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -4744,6 +4774,7 @@ def transpose(self):
self._column_widths_cache,
self._row_lengths_cache,
dtypes=new_dtypes,
# TODO: backend preserved?
pandas_backend=self._pandas_backend,
)

Expand Down Expand Up @@ -4932,6 +4963,7 @@ def remote_fn(df, name, caselist): # pragma: no cover
columns,
row_lengths,
column_widths,
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)
for part in list_of_right_parts
Expand Down Expand Up @@ -5004,5 +5036,6 @@ def map_data(
index=self.index,
row_lengths=lengths,
column_widths=[1],
# CHECKED: backend may be changed depending on function
pandas_backend=self._pandas_backend,
)

0 comments on commit 9562144

Please sign in to comment.