Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v0.12.0 #297

Merged
merged 6 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions .github/workflows/test-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.8, 3.9, '3.10', '3.11']
spark-version: [3.1.3, 3.2.4, 3.3.4, 3.4.2, 3.5.0]
python-version: [3.9, '3.10', '3.11']
spark-version: [3.2.4, 3.3.4, 3.4.2, 3.5.1]
pandas-version: [2.2.2, 1.5.3]
exclude:
- python-version: '3.11'
spark-version: 3.1.3
- python-version: '3.11'
spark-version: 3.2.4
- python-version: '3.11'
Expand Down Expand Up @@ -51,6 +50,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install pytest pytest-spark pypandoc
python -m pip install pyspark==${{ matrix.spark-version }}
python -m pip install pandas==${{ matrix.pandas-version }}
python -m pip install .[dev]
- name: Test with pytest
run: |
Expand All @@ -62,7 +62,8 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.8, 3.9, '3.10', '3.11']
python-version: [3.9, '3.10', '3.11']

env:
PYTHON_VERSION: ${{ matrix.python-version }}

Expand All @@ -88,7 +89,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.8, 3.9, '3.10', '3.11']
python-version: [3.9, '3.10', '3.11']
env:
PYTHON_VERSION: ${{ matrix.python-version }}

Expand Down
3 changes: 2 additions & 1 deletion CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
- Usman Azhar
- Mark Zhou
- Ian Whitestone
- Faisal Dosani
- Faisal Dosani
- Lorenzo Mercado
48 changes: 38 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,44 @@ pip install datacompy[ray]

```

### In-scope Spark versions
Different versions of Spark play nicely with only certain versions of Python below is a matrix of what we test with
### Legacy Spark Deprecation

#### Starting with version 0.12.0

The original ``SparkCompare`` implementation differs from all the other native implementations. To align the API better, and keep behaviour consistent we are deprecating ``SparkCompare`` into a new module ``LegacySparkCompare``

If you wish to use the old SparkCompare moving forward you can

```python
import datacompy.legacy.LegacySparkCompare
```

#### Supported versions and dependncies

Different versions of Spark, Pandas, and Python interact differently. Below is a matrix of what we test with.
With the move to Pandas on Spark API and compatability issues with Pandas 2+ we will for the mean time note support Pandas 2
with the Pandas on Spark implementation. Spark plans to support Pandas 2 in [Spark 4](https://issues.apache.org/jira/browse/SPARK-44101)

With version ``0.12.0``:
- Not support Pandas ``2.0.0`` For the native Spark implemention
- Spark ``3.1`` support will be dropped
- Python ``3.8`` support is dropped


| | Spark 3.2.4 | Spark 3.3.4 | Spark 3.4.2 | Spark 3.5.1 |
|-------------|-------------|-------------|-------------|-------------|
| Python 3.9 | ✅ | ✅ | ✅ | ✅ |
| Python 3.10 | ✅ | ✅ | ✅ | ✅ |
| Python 3.11 | ❌ | ❌ | ✅ | ✅ |
| Python 3.12 | ❌ | ❌ | ❌ | ❌ |


| | Pandas < 1.5.3 | Pandas >=2.0.0 |
|---------------|----------------|----------------|
| Native Pandas | ✅ | ✅ |
| Native Spark | ✅ | ❌ |
| Fugue | ✅ | ✅ |

| | Spark 3.1.3 | Spark 3.2.3 | Spark 3.3.4 | Spark 3.4.2 | Spark 3.5.0 |
|-------------|--------------|-------------|-------------|-------------|-------------|
| Python 3.8 | ✅ | ✅ | ✅ | ✅ | ✅ |
| Python 3.9 | ✅ | ✅ | ✅ | ✅ | ✅ |
| Python 3.10 | ✅ | ✅ | ✅ | ✅ | ✅ |
| Python 3.11 | ❌ | ❌ | ❌ | ✅ | ✅ |
| Python 3.12 | ❌ | ❌ | ❌ | ❌ | ❌ |


> [!NOTE]
Expand All @@ -56,7 +84,7 @@ Different versions of Spark play nicely with only certain versions of Python bel
## Supported backends

- Pandas: ([See documentation](https://capitalone.github.io/datacompy/pandas_usage.html))
- Spark: ([See documentation](https://capitalone.github.io/datacompy/spark_usage.html))
- Spark (Pandas on Spark API): ([See documentation](https://capitalone.github.io/datacompy/spark_usage.html))
- Polars (Experimental): ([See documentation](https://capitalone.github.io/datacompy/polars_usage.html))
- Fugue is a Python library that provides a unified interface for data processing on Pandas, DuckDB, Polars, Arrow,
Spark, Dask, Ray, and many other backends. DataComPy integrates with Fugue to provide a simple way to compare data
Expand Down
5 changes: 3 additions & 2 deletions datacompy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.11.3"
__version__ = "0.12.0"

from datacompy.core import *
from datacompy.fugue import (
all_columns_match,
all_rows_overlap,
count_matching_rows,
intersect_columns,
is_match,
report,
unq_columns,
)
from datacompy.polars import PolarsCompare
from datacompy.spark import NUMERIC_SPARK_TYPES, SparkCompare
from datacompy.spark import SparkCompare
44 changes: 28 additions & 16 deletions datacompy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
PROC COMPARE in SAS - i.e. human-readable reporting on the difference between
two dataframes.
"""

import logging
import os
from typing import Any, Dict, List, Optional, Union, cast
Expand Down Expand Up @@ -283,7 +284,11 @@ def _dataframe_merge(self, ignore_spaces: bool) -> None:
self.df2[column] = self.df2[column].str.strip()

outer_join = self.df1.merge(
self.df2, how="outer", suffixes=("_df1", "_df2"), indicator=True, **params
self.df2,
how="outer",
suffixes=("_" + self.df1_name, "_" + self.df2_name),
indicator=True,
**params,
)
# Clean up temp columns for duplicate row matching
if self._any_dupes:
Expand All @@ -295,8 +300,8 @@ def _dataframe_merge(self, ignore_spaces: bool) -> None:
self.df1.drop(order_column, axis=1, inplace=True)
self.df2.drop(order_column, axis=1, inplace=True)

df1_cols = get_merged_columns(self.df1, outer_join, "_df1")
df2_cols = get_merged_columns(self.df2, outer_join, "_df2")
df1_cols = get_merged_columns(self.df1, outer_join, self.df1_name)
df2_cols = get_merged_columns(self.df2, outer_join, self.df2_name)

LOG.debug("Selecting df1 unique rows")
self.df1_unq_rows = outer_join[outer_join["_merge"] == "left_only"][
Expand Down Expand Up @@ -334,8 +339,8 @@ def _intersect_compare(self, ignore_spaces: bool, ignore_case: bool) -> None:
max_diff = 0.0
null_diff = 0
else:
col_1 = column + "_df1"
col_2 = column + "_df2"
col_1 = column + "_" + self.df1_name
col_2 = column + "_" + self.df2_name
col_match = column + "_match"
self.intersect_rows[col_match] = columns_equal(
self.intersect_rows[col_1],
Expand Down Expand Up @@ -484,7 +489,10 @@ def sample_mismatch(
match_cnt = col_match.sum()
sample_count = min(sample_count, row_cnt - match_cnt)
sample = self.intersect_rows[~col_match].sample(sample_count)
return_cols = self.join_columns + [column + "_df1", column + "_df2"]
return_cols = self.join_columns + [
column + "_" + self.df1_name,
column + "_" + self.df2_name,
]
to_return = sample[return_cols]
if for_display:
to_return.columns = pd.Index(
Expand Down Expand Up @@ -517,8 +525,8 @@ def all_mismatch(self, ignore_matching_cols: bool = False) -> pd.DataFrame:
orig_col_name = col[:-6]

col_comparison = columns_equal(
self.intersect_rows[orig_col_name + "_df1"],
self.intersect_rows[orig_col_name + "_df2"],
self.intersect_rows[orig_col_name + "_" + self.df1_name],
self.intersect_rows[orig_col_name + "_" + self.df2_name],
self.rel_tol,
self.abs_tol,
self.ignore_spaces,
Expand All @@ -530,7 +538,12 @@ def all_mismatch(self, ignore_matching_cols: bool = False) -> pd.DataFrame:
):
LOG.debug(f"Adding column {orig_col_name} to the result.")
match_list.append(col)
return_list.extend([orig_col_name + "_df1", orig_col_name + "_df2"])
return_list.extend(
[
orig_col_name + "_" + self.df1_name,
orig_col_name + "_" + self.df2_name,
]
)
elif ignore_matching_cols:
LOG.debug(
f"Column {orig_col_name} is equal in df1 and df2. It will not be added to the result."
Expand Down Expand Up @@ -613,7 +626,6 @@ def df_to_str(pdf: pd.DataFrame) -> str:
)

# Column Matching
cnt_intersect = self.intersect_rows.shape[0]
report += render(
"column_comparison.txt",
len([col for col in self.column_stats if col["unequal_cnt"] > 0]),
Expand Down Expand Up @@ -804,7 +816,7 @@ def columns_equal(
compare = pd.Series(
(col_1 == col_2) | (col_1.isnull() & col_2.isnull())
)
except:
except Exception:
# Blanket exception should just return all False
compare = pd.Series(False, index=col_1.index)
compare.index = col_1.index
Expand Down Expand Up @@ -842,13 +854,13 @@ def compare_string_and_date_columns(
(pd.to_datetime(obj_column) == date_column)
| (obj_column.isnull() & date_column.isnull())
)
except:
except Exception:
try:
return pd.Series(
(pd.to_datetime(obj_column, format="mixed") == date_column)
| (obj_column.isnull() & date_column.isnull())
)
except:
except Exception:
return pd.Series(False, index=col_1.index)


Expand All @@ -871,8 +883,8 @@ def get_merged_columns(
for col in original_df.columns:
if col in merged_df.columns:
columns.append(col)
elif col + suffix in merged_df.columns:
columns.append(col + suffix)
elif col + "_" + suffix in merged_df.columns:
columns.append(col + "_" + suffix)
else:
raise ValueError("Column not found: %s", col)
return columns
Expand Down Expand Up @@ -920,7 +932,7 @@ def calculate_max_diff(col_1: "pd.Series[Any]", col_2: "pd.Series[Any]") -> floa
"""
try:
return cast(float, (col_1.astype(float) - col_2.astype(float)).abs().max())
except:
except Exception:
return 0.0


Expand Down
96 changes: 95 additions & 1 deletion datacompy/fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,101 @@ def all_rows_overlap(
return all(overlap)


def count_matching_rows(
df1: AnyDataFrame,
df2: AnyDataFrame,
join_columns: Union[str, List[str]],
abs_tol: float = 0,
rel_tol: float = 0,
df1_name: str = "df1",
df2_name: str = "df2",
ignore_spaces: bool = False,
ignore_case: bool = False,
cast_column_names_lower: bool = True,
parallelism: Optional[int] = None,
strict_schema: bool = False,
) -> int:
"""Count the number of rows match (on overlapping fields)

Parameters
----------
df1 : ``AnyDataFrame``
First dataframe to check
df2 : ``AnyDataFrame``
Second dataframe to check
join_columns : list or str, optional
Column(s) to join dataframes on. If a string is passed in, that one
column will be used.
abs_tol : float, optional
Absolute tolerance between two values.
rel_tol : float, optional
Relative tolerance between two values.
df1_name : str, optional
A string name for the first dataframe. This allows the reporting to
print out an actual name instead of "df1", and allows human users to
more easily track the dataframes.
df2_name : str, optional
A string name for the second dataframe
ignore_spaces : bool, optional
Flag to strip whitespace (including newlines) from string columns (including any join
columns)
ignore_case : bool, optional
Flag to ignore the case of string columns
cast_column_names_lower: bool, optional
Boolean indicator that controls of column names will be cast into lower case
parallelism: int, optional
An integer representing the amount of parallelism. Entering a value for this
will force to use of Fugue over just vanilla Pandas
strict_schema: bool, optional
The schema must match exactly if set to ``True``. This includes the names and types. Allows for a fast fail.

Returns
-------
int
Number of matching rows
"""
if (
isinstance(df1, pd.DataFrame)
and isinstance(df2, pd.DataFrame)
and parallelism is None # user did not specify parallelism
and fa.get_current_parallelism() == 1 # currently on a local execution engine
):
comp = Compare(
df1=df1,
df2=df2,
join_columns=join_columns,
abs_tol=abs_tol,
rel_tol=rel_tol,
df1_name=df1_name,
df2_name=df2_name,
ignore_spaces=ignore_spaces,
ignore_case=ignore_case,
cast_column_names_lower=cast_column_names_lower,
)
return comp.count_matching_rows()

try:
count_matching_rows = _distributed_compare(
df1=df1,
df2=df2,
join_columns=join_columns,
return_obj_func=lambda comp: comp.count_matching_rows(),
abs_tol=abs_tol,
rel_tol=rel_tol,
df1_name=df1_name,
df2_name=df2_name,
ignore_spaces=ignore_spaces,
ignore_case=ignore_case,
cast_column_names_lower=cast_column_names_lower,
parallelism=parallelism,
strict_schema=strict_schema,
)
except _StrictSchemaError:
return False

return sum(count_matching_rows)


def report(
df1: AnyDataFrame,
df2: AnyDataFrame,
Expand Down Expand Up @@ -460,7 +555,6 @@ def _any(col: str) -> int:
any_mismatch = len(match_sample) > 0

# Column Matching
cnt_intersect = shape0("intersect_rows_shape")
rpt += render(
"column_comparison.txt",
len([col for col in column_stats if col["unequal_cnt"] > 0]),
Expand Down
Loading
Loading