diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 7ceb4d684..b65735d1e 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -29,7 +29,7 @@ from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool -from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel +from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, CspEnv, ExecutionEngine from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender from spark_rapids_tools.tools.qualx.qualx_main import predict @@ -289,12 +289,12 @@ def __build_global_report_summary(self, total_apps: pd.DataFrame, unsupported_ops_df: pd.DataFrame, output_files_info: JSONPropertiesContainer) -> QualificationSummary: + # TODO: This method does a lot of critical but unrelated work. Refactor this into smaller steps/methods + # to improve readability and maintainability. if all_apps.empty: # No need to run saving estimator or process the data frames. return QualificationSummary(total_apps=total_apps, tools_processed_apps=all_apps) - unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output', - 'unsupportedOperators')) # Generate the statistics report try: stats_report = SparkQualificationStats(ctxt=self.ctxt) @@ -303,37 +303,52 @@ def __build_global_report_summary(self, self.logger.error('Failed to generate the statistics report: %s', e) # Calculate unsupported operators stage duration before grouping + unsupported_ops_obj = UnsupportedOpsStageDuration( + self.ctxt.get_value('local', 'output', 'unsupportedOperators')) all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df) apps_pruned_df = self.__remap_columns_and_prune(all_apps) + # Apply additional heuristics to skip apps not suitable for GPU acceleration heuristics_ob = AdditionalHeuristics( props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'), tools_output_dir=self.ctxt.get_rapids_output_folder(), output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path')) apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df) - speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories')) + # Group the applications and recalculate metrics apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df) - df_final_result = speedup_category_ob.build_category_column(apps_grouped_df) + + # Assign execution engine (Spark/Photon etc.) and speedup categories (Small/Medium/Large) to each application. + # Note: Strategy for speedup categorization will be based on the execution engine of the application. + apps_with_exec_engine_df = self._assign_execution_engine_to_apps(apps_grouped_df) + speedup_category_confs = self.ctxt.get_value('local', 'output', 'speedupCategories') + speedup_category_ob = SpeedupCategory(speedup_category_confs) + df_final_result = speedup_category_ob.build_category_column(apps_with_exec_engine_df) + + # Generate the cluster shape report reshaped_notes = self.__generate_cluster_shape_report() report_comments = [group_notes] if group_notes else [] if reshaped_notes: report_comments.append(reshaped_notes) + # Write the final result to the output file csv_out = output_files_info.get_value('summary', 'path') if not df_final_result.empty: self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out) df_final_result.to_csv(csv_out, float_format='%.2f') + # Add columns for cluster configuration recommendations and tuning configurations to the processed_apps. recommender = ClusterConfigRecommender(self.ctxt) df_final_result = recommender.add_cluster_and_tuning_recommendations(df_final_result) - # Merge the total_apps with the processed_apps to get the Event Log df_final_result = pd.merge(df_final_result, total_apps[['Event Log', 'AppID']], left_on='App ID', right_on='AppID') + # Write the app metadata app_metadata_info = output_files_info.get_value('appMetadata') config_recommendations_info = output_files_info.get_value('configRecommendations') self._write_app_metadata(df_final_result, app_metadata_info, config_recommendations_info) + + # Return the summary return QualificationSummary(total_apps=total_apps, tools_processed_apps=df_final_result, comments=report_comments) @@ -595,6 +610,66 @@ def _read_qualification_output_file(self, report_name_key: str, file_format_key: report_file_path = FSUtil.build_path(self.ctxt.get_rapids_output_folder(), report_file_name) return pd.read_csv(report_file_path) + def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFrame]: + """ + Helper method to read metric files from the qualification tool's output metric folder. + Returns a dictionary of DataFrames, where each key is an application ID, and each + DataFrame contains the corresponding application's metrics data. + Example: + { + 'appId1': pd.DataFrame(...), + 'appId2': pd.DataFrame(...), + } + :param file_name: Name of the metric file to read from each application's folder + """ + metrics = {} + root_metric_dir = self.ctxt.get_metrics_output_folder() + for metric_dir in FSUtil.get_subdirectories(root_metric_dir): + app_id_str = FSUtil.get_resource_name(metric_dir) + report_file_path = FSUtil.build_path(metric_dir, file_name) + try: + metrics[app_id_str] = pd.read_csv(report_file_path) + except Exception as e: # pylint: disable=broad-except + # Some apps may not have the given metrics file, we should ensure + # that the dictionary contains entries for all apps to avoid KeyErrors + # and maintain consistency in processing. + metrics[app_id_str] = pd.DataFrame() + self.logger.warning('Unable to read metrics file for app %s. Reason - %s:%s', + app_id_str, type(e).__name__, e) + return metrics + + def _assign_execution_engine_to_apps(self, tools_processed_apps: pd.DataFrame) -> pd.DataFrame: + """ + Assigns the execution engine (Spark/Photon) to each application. This will be used to categorize + applications into speedup categories (Small/Medium/Large). + """ + spark_properties = self._read_qualification_metric_file('spark_properties.csv') + default_exec_engine_type = ExecutionEngine.get_default() + exec_engine_col_name = self.ctxt.get_value('local', 'output', 'speedupCategories', 'execEngineColumnName') + + # Default to Spark-based execution type for non-Databricks platforms + if self.ctxt.platform.get_platform_name() not in {CspEnv.DATABRICKS_AWS, CspEnv.DATABRICKS_AZURE}: + tools_processed_apps[exec_engine_col_name] = default_exec_engine_type + return tools_processed_apps + + # Create a map of App IDs to their execution engine type (Spark/Photon) + spark_version_key = 'spark.databricks.clusterUsageTags.sparkVersion' + exec_engine_map = {} + + for app_id, props_df in spark_properties.items(): + props_dict = Utilities.convert_df_to_dict(props_df) + spark_version = props_dict.get(spark_version_key, '').lower() + if ExecutionEngine.PHOTON.lower() in spark_version: + exec_engine_map[app_id] = ExecutionEngine.PHOTON + else: + exec_engine_map[app_id] = default_exec_engine_type + + # Assign the execution engine type to each application DataFrame row + tools_processed_apps[exec_engine_col_name] = ( + tools_processed_apps['App ID'].map(exec_engine_map).fillna(default_exec_engine_type) + ) + return tools_processed_apps + @dataclass class QualificationAsLocal(Qualification): diff --git a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py index e0756784b..0e4c0b64d 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py +++ b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py @@ -215,6 +215,11 @@ def get_rapids_output_folder(self) -> str: return root_dir return FSUtil.build_path(root_dir, rapids_subfolder) + def get_metrics_output_folder(self) -> str: + root_dir = self.get_rapids_output_folder() + metrics_subfolder = self.get_value('toolOutput', 'metricsSubFolder') + return FSUtil.build_path(root_dir, metrics_subfolder) + def get_log4j_properties_file(self) -> str: return self.get_value_silent('toolOutput', 'textFormat', 'log4jFileName') diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index c6edb6431..6d44f73f7 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -1,6 +1,7 @@ toolOutput: completeOutput: true subFolder: rapids_4_spark_qualification_output + metricsSubFolder: raw_metrics textFormat: summaryLog: fileName: rapids_4_spark_qualification_output.log @@ -148,6 +149,7 @@ local: - 'App Name' - 'Event Log' - 'Cluster Info' + - 'Execution Engine' - 'Estimated GPU Speedup Category' - 'Full Cluster Config Recommendations*' - 'GPU Config Recommendation Breakdown*' @@ -254,27 +256,51 @@ local: speedupColumnName: 'Estimated GPU Speedup' categoryColumnName: 'Estimated GPU Speedup Category' heuristicsColumnName: 'Skip by Heuristics' - categories: - - title: 'Not Applicable' - lowerBound: -1000000.0 - upperBound: 1.3 - - title: 'Small' - lowerBound: 1.3 - upperBound: 2.0 - - title: 'Medium' - lowerBound: 2.0 - upperBound: 3.0 - - title: 'Large' - lowerBound: 3.0 - upperBound: 1000000.0 - eligibilityConditions: - - columnName: 'Estimated GPU Speedup' - lowerBound: 1.3 - upperBound: 1000000.0 - - columnName: 'Unsupported Operators Stage Duration Percent' - lowerBound: 0.0 - upperBound: 25.0 + execEngineColumnName: 'Execution Engine' defaultCategory: 'Not Recommended' + strategies: + spark: # Spark specific speedup categories + categories: + - title: 'Not Applicable' + lowerBound: -1000000.0 + upperBound: 1.3 + - title: 'Small' + lowerBound: 1.3 + upperBound: 2.0 + - title: 'Medium' + lowerBound: 2.0 + upperBound: 3.0 + - title: 'Large' + lowerBound: 3.0 + upperBound: 1000000.0 + eligibilityConditions: + - columnName: 'Estimated GPU Speedup' + lowerBound: 1.3 + upperBound: 1000000.0 + - columnName: 'Unsupported Operators Stage Duration Percent' + lowerBound: 0.0 + upperBound: 25.0 + photon: # Photon specific speedup categories + categories: + - title: 'Not Applicable' + lowerBound: -1000000.0 + upperBound: 1.0 + - title: 'Small' + lowerBound: 1.0 + upperBound: 2.0 + - title: 'Medium' + lowerBound: 2.0 + upperBound: 3.0 + - title: 'Large' + lowerBound: 3.0 + upperBound: 1000000.0 + eligibilityConditions: + - columnName: 'Estimated GPU Speedup' + lowerBound: 1.0 + upperBound: 1000000.0 + - columnName: 'Unsupported Operators Stage Duration Percent' + lowerBound: 0.0 + upperBound: 25.0 additionalHeuristics: appInfo: fileName: 'application_information.csv' diff --git a/user_tools/src/spark_rapids_tools/enums.py b/user_tools/src/spark_rapids_tools/enums.py index 61dca23a6..43b4d12e7 100644 --- a/user_tools/src/spark_rapids_tools/enums.py +++ b/user_tools/src/spark_rapids_tools/enums.py @@ -210,3 +210,15 @@ def create_default_model_args(cls, model_type: str) -> dict: 'xgboostEnabled': model_type == QualEstimationModel.XGBOOST, 'customModelFile': None, } + + +class ExecutionEngine(EnumeratedType): + """ + Represents the execution engine for the application (Spark or Photon). + """ + SPARK = 'spark' + PHOTON = 'photon' + + @classmethod + def get_default(cls) -> 'ExecutionEngine': + return cls.SPARK diff --git a/user_tools/src/spark_rapids_tools/tools/speedup_category.py b/user_tools/src/spark_rapids_tools/tools/speedup_category.py index 15d9b8790..830df4d12 100644 --- a/user_tools/src/spark_rapids_tools/tools/speedup_category.py +++ b/user_tools/src/spark_rapids_tools/tools/speedup_category.py @@ -15,19 +15,44 @@ """Implementation class for Speedup Category logic.""" from dataclasses import dataclass, field -from typing import Optional +from typing import Optional, Dict import pandas as pd +class SpeedupStrategy: + """ + Wrapper class for speedup strategy properties. + """ + _categories: list + _eligibility_conditions: list + + def __init__(self, props: dict): + self._categories = props.get('categories', []) + self._eligibility_conditions = props.get('eligibilityConditions', []) + + def get_categories(self) -> list: + return self._categories + + def get_eligibility_conditions(self) -> list: + return self._eligibility_conditions + + @dataclass class SpeedupCategory: """ Encapsulates the logic to categorize the speedup values based on the range values. """ props: dict = field(default=None, init=True) + speedup_strategies: Dict[str, SpeedupStrategy] = field(default_factory=dict, init=False) + + def __post_init__(self): + strategy_properties = self.props.get('strategies', {}) + # Create a SpeedupStrategy for each execution engine type. + for exec_engine, properties in strategy_properties.items(): # type: str, dict + self.speedup_strategies[exec_engine] = SpeedupStrategy(properties) - def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame: + def _build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame: """ Build the category column based on the range values of the speedup column. Example: @@ -44,20 +69,24 @@ def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame: output: row_2 = pd.Series({'speedup': 3.5, 'speedup category': 'Large'}) reason: Speedup Category will be 'Large' because the speedup is within the range (3-100000). """ - categories = self.props.get('categories') category_col_name = self.props.get('categoryColumnName') speedup_col_name = self.props.get('speedupColumnName') + exec_engine_col_name = self.props.get('execEngineColumnName') # Calculate the category based on the speedup value - def calculate_category(col_value) -> Optional[str]: + def calculate_category(single_row: pd.Series) -> Optional[str]: + exec_engine = single_row.get(exec_engine_col_name) + # Get the speedup strategy and its categories for the given execution engine type. + categories = self.speedup_strategies.get(exec_engine).get_categories() + col_value = single_row.get(speedup_col_name) for category in categories: if category.get('lowerBound') <= col_value < category.get('upperBound'): return category.get('title') return None - all_apps[category_col_name] = all_apps[speedup_col_name].apply(calculate_category) + all_apps[category_col_name] = all_apps.apply(calculate_category, axis=1) return all_apps - def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame: + def _process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame: """ Process the speedup category column based on the eligibility criteria. If the row does not match the criteria, the category column will be set to the `Not Recommended` category. @@ -76,9 +105,13 @@ def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame: """ category_col_name = self.props.get('categoryColumnName') heuristics_col_name = self.props.get('heuristicsColumnName') + exec_engine_col_name = self.props.get('execEngineColumnName') def process_row(single_row: pd.Series) -> str: - for entry in self.props.get('eligibilityConditions'): + exec_engine = single_row.get(exec_engine_col_name) + # Get the speedup strategy and its eligibility conditions for the given execution engine type. + eligibility_conditions = self.speedup_strategies.get(exec_engine).get_eligibility_conditions() + for entry in eligibility_conditions: col_value = single_row[entry.get('columnName')] # If the row is marked to be skipped by heuristics or the value is not within the range, # set the category to default category (Not Recommended) @@ -91,6 +124,6 @@ def process_row(single_row: pd.Series) -> str: return all_apps def build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame: - apps_with_category = self.__build_category_column(all_apps) - processed_apps = self.__process_category(apps_with_category) + apps_with_category = self._build_category_column(all_apps) + processed_apps = self._process_category(apps_with_category) return processed_apps diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index 7ceb81ee5..0c94df97f 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -349,3 +349,13 @@ def bytes_to_human_readable(cls, num_bytes: int) -> str: num_bytes /= 1024.0 i += 1 return f'{num_bytes:.2f} {size_units[i]}' + + @classmethod + def convert_df_to_dict(cls, df: pd.DataFrame) -> dict: + """ + Converts a DataFrame with exactly two columns into a dictionary. The first column is used as keys + and the second column as values. + """ + assert len(df.columns) == 2, 'Cannot convert DataFrame to dict, expected 2 columns' + key_col, value_col = df.columns[0], df.columns[1] + return df.set_index(key_col)[value_col].to_dict() diff --git a/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature b/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature index cd66b0bb6..2da02266c 100644 --- a/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature +++ b/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature @@ -41,3 +41,16 @@ Feature: Event Log Processing Qualification. Raised an error in phase [Execution] """ And return code is "1" + + @test_id_ELP_0003 + Scenario Outline: Qualification tool processes event logs with different execution engine + Given platform is "" + When spark-rapids tool is executed with "" eventlogs + Then "app_metadata.json" contains execution engine as "" + And return code is "0" + + Examples: + | platform | event_logs | execution_engines | + | databricks-aws | join_agg_on_yarn_eventlog.zstd | spark | + | databricks-aws | join_agg_on_yarn_eventlog.zstd,photon_eventlog.zstd | spark;photon | + | dataproc | join_agg_on_yarn_eventlog.zstd | spark | diff --git a/user_tools/tests/spark_rapids_tools_e2e/features/steps/e2e_utils.py b/user_tools/tests/spark_rapids_tools_e2e/features/steps/e2e_utils.py index 4468c0050..d45399698 100644 --- a/user_tools/tests/spark_rapids_tools_e2e/features/steps/e2e_utils.py +++ b/user_tools/tests/spark_rapids_tools_e2e/features/steps/e2e_utils.py @@ -18,11 +18,12 @@ import logging import os +import re import subprocess from attr import dataclass from enum import auto from pathlib import Path -from typing import List +from typing import List, Optional from urllib.parse import urlparse from spark_rapids_tools import EnumeratedType @@ -91,6 +92,17 @@ def create_spark_rapids_cmd(cls, def get_tools_root_path() -> str: return str(Path(__file__).parents[5]) + @staticmethod + def get_tools_output_dir(log_str: str) -> Optional[str]: + """ + Extracts the output directory path from the given log string (e.g. /path/to/qual_2024xxx) + + :param log_str: Log string containing the output directory path. + :return: Directory path if found, otherwise None. + """ + match = re.search(r"tool output: (\/\S+)", log_str) + return os.path.dirname(match.group(1)) if match else None + @staticmethod def get_e2e_tests_root_path() -> str: return str(Path(__file__).parents[2]) diff --git a/user_tools/tests/spark_rapids_tools_e2e/features/steps/test_steps.py b/user_tools/tests/spark_rapids_tools_e2e/features/steps/test_steps.py index e1bd83933..f18ae8246 100644 --- a/user_tools/tests/spark_rapids_tools_e2e/features/steps/test_steps.py +++ b/user_tools/tests/spark_rapids_tools_e2e/features/steps/test_steps.py @@ -15,13 +15,14 @@ """ This module defines steps to be used by the end-to-end tests using behave. """ - +import json import os import shutil import tempfile import threading from time import sleep from typing import Callable +from urllib.parse import urlparse from behave import given, when, then @@ -131,11 +132,13 @@ def step_hdfs_has_eventlogs(context, event_logs) -> None: @when('spark-rapids tool is executed with "{event_logs}" eventlogs') def step_execute_spark_rapids_tool(context, event_logs) -> None: event_logs_list = E2ETestUtils.resolve_event_logs(event_logs.split(",")) + context.event_logs_list = event_logs_list if hasattr(context, 'platform'): cmd = E2ETestUtils.create_spark_rapids_cmd(event_logs_list, context.temp_dir, context.platform) else: cmd = E2ETestUtils.create_spark_rapids_cmd(event_logs_list, context.temp_dir) context.result = E2ETestUtils.run_sys_cmd(cmd) + context.tools_output_dir = E2ETestUtils.get_tools_output_dir(context.result.stdout) @then('stderr contains the following') @@ -170,3 +173,26 @@ def step_verify_num_apps(context, expected_num_apps) -> None: def step_verify_return_code(context, return_code) -> None: assert context.result.returncode == return_code, \ f"Expected return code: {return_code}, Actual return code: {context.result.returncode}" + + +@then('"{metadata_file}" contains execution engine as "{execution_engines}"') +def step_verify_metadata_file(context, metadata_file: str, execution_engines: str) -> None: + # Create a mapping between event log paths and their expected execution engines + exec_engine_map = dict(zip(context.event_logs_list, execution_engines.split(';'))) + app_metadata_file = os.path.join(context.tools_output_dir, metadata_file) + try: + with open(app_metadata_file, 'r') as file: + app_metadata = json.load(file) + assert len(app_metadata) == len(exec_engine_map), ( + f"{app_metadata_file} expected {len(exec_engine_map)} apps, found {len(app_metadata)}" + ) + # Verify that each app's execution engine matches the expected value + for metadata in app_metadata: + actual_engine = metadata.get('executionEngine') + event_log_path = urlparse(metadata.get('eventLog')).path + expected_engine = exec_engine_map.get(event_log_path) + assert actual_engine == expected_engine, ( + f"{app_metadata_file} expected {expected_engine}, found {actual_engine}" + ) + except Exception as e: + raise RuntimeError(f"Failed to verify metadata file: {e}") from e diff --git a/user_tools/tests/spark_rapids_tools_e2e/resources/event_logs/photon_eventlog.zstd b/user_tools/tests/spark_rapids_tools_e2e/resources/event_logs/photon_eventlog.zstd index 2ff810859..505a36b01 100644 Binary files a/user_tools/tests/spark_rapids_tools_e2e/resources/event_logs/photon_eventlog.zstd and b/user_tools/tests/spark_rapids_tools_e2e/resources/event_logs/photon_eventlog.zstd differ