Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed May 13, 2024
1 parent 22f2db6 commit acc20b3
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 23 deletions.
1 change: 1 addition & 0 deletions modin/core/storage_formats/pandas/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def corr_method(
min_periods: int = 1,
numeric_only: bool = True,
) -> PandasQueryCompiler:
# Further implementation is designed for the default pandas backend (numpy)
if method != "pearson" or qc.get_backend() == "pyarrow":
return super(type(qc), qc).corr(
method=method, min_periods=min_periods, numeric_only=numeric_only
Expand Down
21 changes: 6 additions & 15 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
is_datetime64_any_dtype,
is_list_like,
is_numeric_dtype,
is_timedelta64_dtype,
)
from pandas.core.groupby.base import transformation_kernels
from pandas.core.indexes.api import ensure_index_from_sequences
Expand Down Expand Up @@ -1855,7 +1854,6 @@ def isin_func(df, values):

abs = Map.register(pandas.DataFrame.abs, dtypes="copy")
map = Map.register(pandas.DataFrame.map)
# Will it work with pyarrow backend?
conj = Map.register(lambda df, *args, **kwargs: pandas.DataFrame(np.conj(df)))

def convert_dtypes(
Expand All @@ -1876,13 +1874,14 @@ def convert_dtypes(
convert_floating=convert_floating,
dtype_backend=dtype_backend,
)
# TODO: `numpy_nullable` should be handled similar
if dtype_backend == "pyarrow":
result._modin_frame._pandas_backend = "pyarrow"
return result

invert = Map.register(pandas.DataFrame.__invert__, dtypes="copy")
isna = Map.register(pandas.DataFrame.isna, dtypes=np.bool_)
# better way to distinguish methods for NumPy API?
# TODO: better way to distinguish methods for NumPy API?
_isfinite = Map.register(
lambda df, *args, **kwargs: pandas.DataFrame(np.isfinite(df, *args, **kwargs)),
dtypes=np.bool_,
Expand Down Expand Up @@ -2272,7 +2271,7 @@ def clip(self, lower, upper, **kwargs):
corr = CorrCovBuilder.build_corr_method()

def cov(self, min_periods=None, ddof=1):
if self._modin_frame._pandas_backend == "pyarrow":
if self.get_backend() == "pyarrow":
return super().cov(min_periods=min_periods, ddof=ddof)
# _nancorr use numpy which incompatible with pandas dataframes on pyarrow
return self._nancorr(min_periods=min_periods, cov=True, ddof=ddof)
Expand Down Expand Up @@ -2642,11 +2641,7 @@ def quantile_for_list_of_values(self, **kwargs):
new_columns = [
col
for col, dtype in zip(self.columns, self.dtypes)
if (
is_numeric_dtype(dtype)
or is_timedelta64_dtype(dtype)
or is_datetime64_any_dtype(dtype)
)
if (is_numeric_dtype(dtype) or lib.is_np_dtype(dtype, "mM"))
]
if axis == 1:
query_compiler = self.getitem_column_array(new_columns)
Expand Down Expand Up @@ -2841,7 +2836,6 @@ def applyier(df, internal_indices, other=[], internal_other_indices=[]):

# __getitem__ methods
__getitem_bool = Binary.register(
# TODO: `is_scalar` don't work with pyarrow scalars
lambda df, r: df[[r]] if is_scalar(r) else df[r],
join_type="left",
labels="drop",
Expand Down Expand Up @@ -4532,20 +4526,17 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
pandas.DataFrame
Partition data with updated values.
"""
partition = partition.copy()
try:
partition.iloc[row_internal_indices, col_internal_indices] = item
except ValueError:
# maybe make a copy only if there is an exception?
partition = partition.copy()
# `copy` is needed to avoid "ValueError: buffer source array is read-only" for `item`
# because the item may be converted to the type that is in the dataframe.
# TODO: in the future we will need to convert to the correct type manually according
# to the following warning. Example: "FutureWarning: Setting an item of incompatible
# dtype is deprecated and will raise in a future error of pandas. Value '[1.38629436]'
# has dtype incompatible with int64, please explicitly cast to a compatible dtype first."
partition.iloc[row_internal_indices, col_internal_indices] = (
item.copy() if hasattr(item, "copy") else item
)
partition.iloc[row_internal_indices, col_internal_indices] = item.copy()
return partition

if not is_scalar(item):
Expand Down
14 changes: 13 additions & 1 deletion modin/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ def is_scalar(obj):


def get_pandas_backend(dtypes: pandas.Series) -> str | None:
"""
Determine the backend based on the `dtypes`.
Parameters
----------
dtypes : pandas.Series
DataFrame dtypes.
Returns
-------
str | None
Backend name.
"""
backend = None
if any(isinstance(x, pandas.ArrowDtype) for x in dtypes):
backend = "pyarrow"
Expand Down Expand Up @@ -306,7 +319,6 @@ def broadcast_item(
try:
# Cast to numpy drop information about heterogeneous types (cast to common)
# TODO: we shouldn't do that, maybe there should be the if branch
# TODO: what if item comes from pyarrow
item = np.array(item)
if dtypes is None:
dtypes = pandas.Series([item.dtype] * len(col_lookup))
Expand Down
15 changes: 8 additions & 7 deletions modin/tests/pandas/dataframe/test_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,16 @@ def test_combine_first():

class TestCorr:
@pytest.mark.parametrize("method", ["pearson", "kendall", "spearman"])
def test_corr(self, method):
@pytest.mark.parametrize("backend", [None, "pyarrow"])
def test_corr(self, method, backend):
eval_general(
*create_test_dfs(test_data["int_data"]),
*create_test_dfs(test_data["int_data"], backend=backend),
lambda df: df.corr(method=method),
)
# Modin result may slightly differ from pandas result
# due to floating pointing arithmetic.
eval_general(
*create_test_dfs(test_data["float_nan_data"]),
*create_test_dfs(test_data["float_nan_data"], backend=backend),
lambda df: df.corr(method=method),
comparator=modin_df_almost_equals_pandas,
)
Expand Down Expand Up @@ -352,7 +353,8 @@ def test_corr_nans_in_different_partitions(self):

@pytest.mark.parametrize("min_periods", [1, 3, 5], ids=lambda x: f"min_periods={x}")
@pytest.mark.parametrize("ddof", [1, 2, 4], ids=lambda x: f"ddof={x}")
def test_cov(min_periods, ddof):
@pytest.mark.parametrize("backend", [None, "pyarrow"])
def test_cov(min_periods, ddof, backend):
# Modin result may slightly differ from pandas result
# due to floating pointing arithmetic.
if StorageFormat.get() == "Hdk":
Expand All @@ -366,13 +368,13 @@ def comparator1(df1, df2):
comparator2 = modin_df_almost_equals_pandas

eval_general(
*create_test_dfs(test_data["int_data"]),
*create_test_dfs(test_data["int_data"], backend=backend),
lambda df: df.cov(min_periods=min_periods, ddof=ddof),
comparator=comparator1,
)

eval_general(
*create_test_dfs(test_data["float_nan_data"]),
*create_test_dfs(test_data["float_nan_data"], backend=backend),
lambda df: df.cov(min_periods=min_periods),
comparator=comparator2,
)
Expand Down Expand Up @@ -634,7 +636,6 @@ def test_pivot(data, index, columns, values, request):
expected_exception = ValueError(
"Index contains duplicate entries, cannot reshape"
)
# failed because pandas doesn't preserve dtype backend
eval_general(
*create_test_dfs(data),
lambda df, *args, **kwargs: df.pivot(*args, **kwargs),
Expand Down

0 comments on commit acc20b3

Please sign in to comment.