diff --git a/cleanlab_studio/internal/util.py b/cleanlab_studio/internal/util.py index d6d54180..280171a1 100644 --- a/cleanlab_studio/internal/util.py +++ b/cleanlab_studio/internal/util.py @@ -1,8 +1,9 @@ import pathlib -from typing import Any, Optional, TypeVar, Union +from typing import Any, Optional, TypeVar, Union, List, Dict import math -import numpy as np +import copy + import pandas as pd try: @@ -33,6 +34,28 @@ DatasetSourceType = TypeVar("DatasetSourceType", bound=Union[tuple(dataset_source_types)]) # type: ignore +# Studio team port to backend +AUTOFIX_DEFAULTS = { + "optimized_training_data": { + "drop_ambiguous": 0.0, + "drop_label_issue": 0.5, + "drop_near_duplicate": 0.2, + "drop_outlier": 0.5, + "relabel_confidence_threshold": 0.95, + }, + "drop_all_issues": { + "drop_ambiguous": 1.0, + "drop_label_issue": 1.0, + "drop_near_duplicate": 1.0, + "drop_outlier": 1.0, + }, + "suggested_actions": { + "drop_near_duplicate": 1.0, + "drop_outlier": 1.0, + "relabel_confidence_threshold": 0.0, + }, +} + def init_dataset_source( dataset_source: DatasetSourceType, dataset_name: Optional[str] = None @@ -198,6 +221,130 @@ def check_not_none(x: Any) -> bool: return not check_none(x) +# Studio team port to backend +def _get_autofix_defaults_for_strategy(strategy): + return AUTOFIX_DEFAULTS[strategy] + + +def _get_param_values(cleanset_df, params, strategy): + thresholds = _get_autofix_defaults_for_strategy(strategy) if params is None else params + param_values = {} + for param_type, param_value in thresholds.items(): + # Convert drop fractions to number of rows and leave rest of the parameters as is + if param_type.startswith("drop_"): + issue_name = param_type[5:] + num_rows = cleanset_df[f"is_{issue_name}"].sum() + param_values[param_type] = math.ceil(num_rows * param_value) + else: + param_values[param_type] = param_value + return param_values + + +def _get_top_fraction_ids( # Studio team port to backend + cleanset_df: pd.DataFrame, issue_name: str, num_rows: int, asc=True +) -> List[str]: + """ + This will only return the IDs of datapoints to drop for a given setting of the num_rows to drop during autofix. + Parameters: + - cleanset_df (pd.DataFrame): The input DataFrame containing the cleanset. + - name_col (str): The name of the column indicating the category for which the top rows should be extracted. + - num_rows (int): The number of rows to be extracted. + - asc (bool, optional): If True, the rows are sorted in ascending order based on the score column; if False, in descending order. + Default is True. + + Returns: + - list: A list of row indices representing the top specified number of rows based on the specified score column. + """ + bool_column_name = f"is_{issue_name}" + + # Construct a filter based on the 'label_issue' variable + filter_condition = cleanset_df[bool_column_name] + + # Create a new DataFrame based on the filter + filtered_df = cleanset_df[filter_condition] + if issue_name == "near_duplicate": + # Group by the 'near_duplicate_cluster_ID' column + df_n = filtered_df.sort_values(by="near_duplicate_score").reset_index(drop=True) + sorted_df = df_n.head(num_rows) + grouped_df = sorted_df.groupby("near_duplicate_cluster_id") + + # Initialize an empty list to store the aggregated indices + aggregated_indices = [] + + # Iterate over each group + for group_name, group_df in grouped_df: + # Sort the group DataFrame by the 'near_duplicate_score' column in ascending order + sorted_group_df = group_df.sort_values( + by=f"{issue_name}_score", ascending=asc + ).reset_index(drop=True) + + # Extract every other index and append to the aggregated indices list + selected_indices = sorted_group_df.loc[::2, "cleanlab_row_ID"] + aggregated_indices.extend(selected_indices) + + return aggregated_indices + else: + # Construct the boolean column name with 'is_' prefix and 'label_issue_score' suffix + score_col_name = f"{issue_name}_score" + + # Sort the filtered DataFrame by the constructed boolean column in descending order + sorted_df = filtered_df.sort_values(by=score_col_name, ascending=asc) + + # Extract the top specified number of rows and return the 'cleanlab_row_ID' column + top_rows_ids = sorted_df["cleanlab_row_ID"].head(num_rows) + + return top_rows_ids + + +def _update_label_based_on_confidence(row, conf_threshold): # Studio team port to backend + """Update the label and is_issue based on confidence threshold if there is a label issue. + + Args: + row (pd.Series): The row containing label information. + conf_threshold (float): The confidence threshold for updating the label. + + Returns: + pd.Series: The updated row. + """ + if row["is_label_issue"] and row["suggested_label_confidence_score"] > conf_threshold: + # make sure this does not affect back end. We are doing this to avoid dropping these datapoints in autofix later, they should be relabeled + row["is_issue"] = False + row["is_label_issue"] = False + row["label"] = row["suggested_label"] + return row + + +def apply_autofixed_cleanset_to_new_dataframe( # Studio team port to backend + original_df: pd.DataFrame, cleanset_df: pd.DataFrame, parameters: dict +) -> pd.DataFrame: + """Apply a cleanset to update original dataaset labels and remove top rows based on specified parameters.""" + original_df_copy = copy.deepcopy(original_df) + original_columns = original_df_copy.columns + merged_df = pd.merge(original_df_copy, cleanset_df, left_index=True, right_on="cleanlab_row_ID") + + merged_df = merged_df.apply( + lambda row: _update_label_based_on_confidence( + row, conf_threshold=parameters["relabel_confidence_threshold"] + ), + axis=1, + ) + + indices_to_drop = _get_indices_to_drop(merged_df, parameters) + + merged_df = merged_df.drop(indices_to_drop, axis=0) + return merged_df[original_columns] + + +def _get_indices_to_drop(merged_df, parameters): + indices_to_drop = set() + for param_name, top_num in parameters.items(): + if param_name.startswith("drop_"): + issue_name = param_name.replace("drop_", "") + top_percent_ids = _get_top_fraction_ids(merged_df, issue_name, top_num, asc=False) + indices_to_drop.update(top_percent_ids) + return list(indices_to_drop) + + def quote(s: str) -> str: return f'"{s}"' diff --git a/cleanlab_studio/studio/studio.py b/cleanlab_studio/studio/studio.py index 9c6509c6..5d13c769 100644 --- a/cleanlab_studio/studio/studio.py +++ b/cleanlab_studio/studio/studio.py @@ -1,8 +1,8 @@ """ Python API for Cleanlab Studio. """ -from typing import Any, List, Literal, Optional, Union import warnings +from typing import Any, List, Literal, Optional, Union, Dict import numpy as np import numpy.typing as npt @@ -15,14 +15,17 @@ from cleanlab_studio.internal.api import api from cleanlab_studio.internal.util import ( init_dataset_source, - check_none, apply_corrections_snowpark_df, apply_corrections_spark_df, apply_corrections_pd_df, + apply_autofixed_cleanset_to_new_dataframe, + _get_autofix_defaults_for_strategy, + _get_param_values, ) from cleanlab_studio.internal.settings import CleanlabSettings from cleanlab_studio.internal.types import FieldSchemaDict + _snowflake_exists = api.snowflake_exists if _snowflake_exists: import snowflake.snowpark as snowpark @@ -150,10 +153,10 @@ def apply_corrections(self, cleanset_id: str, dataset: Any, keep_excluded: bool cl_cols = self.download_cleanlab_columns( cleanset_id, to_spark=True, include_project_details=True ) - corrected_ds: pyspark.sql.DataFrame = apply_corrections_spark_df( + pyspark_corrected_ds: pyspark.sql.DataFrame = apply_corrections_spark_df( dataset, cl_cols, id_col, label_col, keep_excluded ) - return corrected_ds + return pyspark_corrected_ds elif isinstance(dataset, pd.DataFrame): cl_cols = self.download_cleanlab_columns(cleanset_id, include_project_details=True) @@ -358,3 +361,54 @@ def poll_cleanset_status(self, cleanset_id: str, timeout: Optional[int] = None) except (TimeoutError, CleansetError): return False + + def autofix_dataset( + self, + original_df: pd.DataFrame, + cleanset_id: str, + params: Optional[Dict[str, Union[int, float]]] = None, + strategy="optimized_training_data", + ) -> pd.DataFrame: + """ + Improves a dataset by applying automatically-suggested corrections based on issues detected by Cleanlab. + Args: + cleanset_id (str): ID of the cleanset from the Project for this Dataset. + original_df (pd.DataFrame): The original dataset (must be a DataFrame, so only text and tabular datasets are currently supported). + params (dict, optional): Optional parameters to control how many data points from each type of detected data issue are auto-corrected or filtered (prioritizing the more severe instances of each issue). If not provided, default `params` values will be used. + The `params` dictionary includes the following options: + + * drop_ambiguous (float): Fraction of the data points detected as ambiguous to exclude from the dataset. + * drop_label_issue (float): Fraction of the data points with label issues to exclude from the dataset. + * drop_near_duplicate (float): Fraction of the data points detected as near duplicates to exclude from the dataset. + * drop_outlier (float): Fraction of the data points detected as outliers to exclude from the dataset. + * relabel_confidence_threshold (float): Confidence threshold for the suggested label, data points with label issues that also exceed this threshold are re-labeled as the suggested label. + + strategy (str): What strategy to use for auto-fixing the dataset out of the following possibilities: + ['optimized_training_data', 'drop_all_issues', 'suggested_actions']. + Each of these possibilities corresponds to a default setting of the `params` dictionary, designed to be used in different scenarios. + If specified, the `params` argument will override this argument. Specify 'optimized_training_data' when your goal is to auto-fix training data to achieve the best ML performance on randomly split test data. + Specify 'drop_all_issues' to instead exclude all datapoints detected to have issues from the dataset. + Specify 'suggested_actions' to instead apply the suggested action to each data point that is displayed in the Cleanlab Studio Web Application (e.g. relabeling for label issues, dropping for outliers, etc). + + Returns: + pd.DataFrame: A new dataframe after applying auto-fixes to the cleanset. + + """ + cleanset_df = self.download_cleanlab_columns(cleanset_id) + if params is not None and strategy is not None: + raise ValueError("Please provide only of params or strategy for autofix") + param_values = _get_param_values(cleanset_df, params, strategy) + return apply_autofixed_cleanset_to_new_dataframe(original_df, cleanset_df, param_values) + + def get_autofix_defaults(self, strategy="optimized_training_data") -> Dict[str, float]: + """ + This method returns the default params auto-fixed dataset. + Args: + strategy (str): Auto-fixing strategy + Possible strategies: optimized_training_data, drop_all_issues, suggested_actions + + Returns: + dict[str, float]: parameter dictionary containing confidence threshold for auto-relabelling, and + fraction of rows to drop for each issue type. + """ + return _get_autofix_defaults_for_strategy(strategy) diff --git a/tests/test_autofix_utils.py b/tests/test_autofix_utils.py new file mode 100644 index 00000000..69fa04ba --- /dev/null +++ b/tests/test_autofix_utils.py @@ -0,0 +1,174 @@ +import pandas as pd +import pytest +from cleanlab_studio.internal.util import ( + _get_param_values, + _update_label_based_on_confidence, + _get_top_fraction_ids, + _get_indices_to_drop, +) +import numpy as np + + +class TestAutofix: + @pytest.mark.parametrize( + "strategy, expected_results", + [ + ( + "optimized_training_data", + { + "drop_ambiguous": 0, + "drop_label_issue": 2, + "drop_near_duplicate": 2, + "drop_outlier": 3, + "relabel_confidence_threshold": 0.95, + }, + ), + ( + "drop_all_issues", + { + "drop_ambiguous": 10, + "drop_label_issue": 3, + "drop_near_duplicate": 6, + "drop_outlier": 6, + }, + ), + ( + "suggested_actions", + { + "drop_near_duplicate": 6, + "drop_outlier": 6, + "relabel_confidence_threshold": 0.0, + }, + ), + ], + ids=["optimized_training_data", "drop_all_issues", "suggested_actions"], + ) + def test_get_param_values(self, strategy, expected_results): + cleanlab_columns = pd.DataFrame() + cleanlab_columns["is_label_issue"] = [True] * 3 + [False] * 7 + cleanlab_columns["is_near_duplicate"] = [True] * 6 + [False] * 4 + cleanlab_columns["is_outlier"] = [True] * 6 + [False] * 4 + cleanlab_columns["is_ambiguous"] = [True] * 10 + + params = _get_param_values(cleanlab_columns, None, strategy) + assert params == expected_results + + @pytest.mark.parametrize( + "row, expected_updated_row", + [ + ( + { + "is_label_issue": True, + "suggested_label_confidence_score": 0.6, + "label": "label_0", + "suggested_label": "label_1", + "is_issue": True, + }, + { + "is_label_issue": False, + "suggested_label_confidence_score": 0.6, + "label": "label_1", + "suggested_label": "label_1", + "is_issue": False, + }, + ), + ( + { + "is_label_issue": True, + "suggested_label_confidence_score": 0.5, + "label": "label_0", + "suggested_label": "label_1", + "is_issue": True, + }, + { + "is_label_issue": True, + "suggested_label_confidence_score": 0.5, + "label": "label_0", + "suggested_label": "label_1", + "is_issue": True, + }, + ), + ( + { + "is_label_issue": True, + "suggested_label_confidence_score": 0.4, + "label": "label_0", + "suggested_label": "label_1", + "is_issue": True, + }, + { + "is_label_issue": True, + "suggested_label_confidence_score": 0.4, + "label": "label_0", + "suggested_label": "label_1", + "is_issue": True, + }, + ), + ( + { + "is_label_issue": False, + "suggested_label_confidence_score": 0.4, + "label": "label_0", + "suggested_label": "label_1", + "is_issue": True, + }, + { + "is_label_issue": False, + "suggested_label_confidence_score": 0.4, + "label": "label_0", + "suggested_label": "label_1", + "is_issue": True, + }, + ), + ], + ids=[ + "is a label issue with confidence score greater than threshold", + "is a label issue with confidence score equal to threshold", + "is a label issue with confidence score less than threshold", + "is not a label issue", + ], + ) + def test_update_label_based_on_confidence(self, row, expected_updated_row): + conf_threshold = 0.5 + updated_row = _update_label_based_on_confidence(row, conf_threshold) + assert updated_row == expected_updated_row + + def test_get_top_fraction_ids(self): + cleanlab_columns = pd.DataFrame() + + cleanlab_columns["cleanlab_row_ID"] = np.arange(10) + cleanlab_columns["is_dummy"] = [False] * 5 + [True] * 5 + cleanlab_columns["dummy_score"] = np.arange(10) * 0.1 + top_ids = _get_top_fraction_ids(cleanlab_columns, "dummy", 3) + assert set(top_ids) == set([5, 6, 7]) + + def test_get_top_fraction_ids_near_duplicate(self): + cleanlab_columns = pd.DataFrame() + + cleanlab_columns["cleanlab_row_ID"] = np.arange(12) + cleanlab_columns["is_near_duplicate"] = [False] * 6 + [True] * 6 + cleanlab_columns["near_duplicate_score"] = np.arange(12) * 0.1 + cleanlab_columns["near_duplicate_cluster_id"] = [None] * 6 + [0, 0, 1, 1, 1, 1] + + top_ids = _get_top_fraction_ids(cleanlab_columns, "near_duplicate", 5) + assert set(top_ids) == set([6, 8, 10]) + + def test_get_indices_to_drop(self): + cleanlab_columns = pd.DataFrame() + cleanlab_columns["cleanlab_row_ID"] = np.arange(10) + cleanlab_columns["is_issue1"] = [True] * 2 + [False] * 8 + cleanlab_columns["issue1_score"] = [1.0, 0.9] + [0] * 8 + cleanlab_columns["is_issue2"] = [False] * 2 + [True] * 4 + [False] * 4 + cleanlab_columns["issue2_score"] = [0] * 2 + [1.0, 0.9, 0.8, 0.7] + [0] * 4 + cleanlab_columns["is_issue3"] = [False] * 4 + [True] * 3 + [False] * 3 + cleanlab_columns["issue3_score"] = [0] * 4 + [1.0, 0.9, 0.8] + [0] * 3 + + params = { + "drop_issue1": 1, + "drop_issue2": 3, + "drop_issue3": 2, + } + expected_indices = [0, 2, 3, 4, 5] + + indices = _get_indices_to_drop(cleanlab_columns, params) + assert set(indices) == set(expected_indices)