Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add qualification support for Photon jobs in the Python Tool #1403

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ disable=
# R0913: Too many arguments
too-many-arguments,
# R0914: Too many local variables
too-many-positional-arguments,
# R0917: Too many positional arguments
too-many-locals,
# R0801: Similar lines in 2 files
duplicate-code,
Expand Down
56 changes: 51 additions & 5 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.speedup_category import SpeedupCategory, SpeedupStrategyBuilder
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
from spark_rapids_tools.utils.util import Utilities
Expand Down Expand Up @@ -289,12 +289,12 @@ def __build_global_report_summary(self,
total_apps: pd.DataFrame,
unsupported_ops_df: pd.DataFrame,
output_files_info: JSONPropertiesContainer) -> QualificationSummary:
# TODO: This method does a lot of critical but unrelated work. Refactor this into smaller steps/methods
# to improve readability and maintainability.
if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary(total_apps=total_apps, tools_processed_apps=all_apps)

unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Generate the statistics report
try:
stats_report = SparkQualificationStats(ctxt=self.ctxt)
Expand All @@ -303,37 +303,55 @@ def __build_global_report_summary(self,
self.logger.error('Failed to generate the statistics report: %s', e)

# Calculate unsupported operators stage duration before grouping
unsupported_ops_obj = UnsupportedOpsStageDuration(
self.ctxt.get_value('local', 'output', 'unsupportedOperators'))
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)

# Apply additional heuristics to skip apps not suitable for GPU acceleration
heuristics_ob = AdditionalHeuristics(
props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'),
tools_output_dir=self.ctxt.get_rapids_output_folder(),
output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path'))
apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df)
speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories'))

# Group the applications and recalculate metrics
apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df)

# Add a column for speedup categories, using different categorization strategies based on application type
spark_properties = self._read_qualification_metric_file('spark_properties.csv')
speedup_category_confs = self.ctxt.get_value('local', 'output', 'speedupCategories')
speedup_strategy = SpeedupStrategyBuilder.build_speedup_strategy(
platform=self.ctxt.platform,
spark_properties=spark_properties,
speedup_strategy_props=speedup_category_confs.get('strategies'))
speedup_category_ob = SpeedupCategory(speedup_category_confs, speedup_strategy)
df_final_result = speedup_category_ob.build_category_column(apps_grouped_df)

# Generate the cluster shape report
reshaped_notes = self.__generate_cluster_shape_report()
report_comments = [group_notes] if group_notes else []
if reshaped_notes:
report_comments.append(reshaped_notes)

# Write the final result to the output file
csv_out = output_files_info.get_value('summary', 'path')
if not df_final_result.empty:
self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out)
df_final_result.to_csv(csv_out, float_format='%.2f')

# Add columns for cluster configuration recommendations and tuning configurations to the processed_apps.
recommender = ClusterConfigRecommender(self.ctxt)
df_final_result = recommender.add_cluster_and_tuning_recommendations(df_final_result)
# Merge the total_apps with the processed_apps to get the Event Log
df_final_result = pd.merge(df_final_result, total_apps[['Event Log', 'AppID']],
left_on='App ID', right_on='AppID')

# Write the app metadata
app_metadata_info = output_files_info.get_value('appMetadata')
config_recommendations_info = output_files_info.get_value('configRecommendations')
self._write_app_metadata(df_final_result, app_metadata_info, config_recommendations_info)

# Return the summary
return QualificationSummary(total_apps=total_apps,
tools_processed_apps=df_final_result,
comments=report_comments)
Expand Down Expand Up @@ -595,6 +613,34 @@ def _read_qualification_output_file(self, report_name_key: str, file_format_key:
report_file_path = FSUtil.build_path(self.ctxt.get_rapids_output_folder(), report_file_name)
return pd.read_csv(report_file_path)

def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFrame]:
"""
Helper method to read and aggregate metric files from the qualification tool's output metric folder.
Returns a dictionary of DataFrames, where each key is an application ID, and each
DataFrame contains the corresponding application's metrics data.
Example:
{
'appId1': pd.DataFrame(...),
'appId2': pd.DataFrame(...),
}
:param file_name: Name of the metric file to read from each application's folder
"""
metrics = {}
metric_dir = self.ctxt.get_metrics_output_folder()
for app_id_dir in FSUtil.get_subdirectories(metric_dir):
app_id_name = FSUtil.get_resource_name(app_id_dir)
report_file_path = FSUtil.build_path(app_id_dir, file_name)
try:
metrics[app_id_name] = pd.read_csv(report_file_path)
except Exception as e: # pylint: disable=broad-except
# Some apps may not have the given metrics file, we should ensure
# that the dictionary contains entries for all apps to avoid KeyErrors
# and maintain consistency in processing.
metrics[app_id_name] = pd.DataFrame()
self.logger.warning('Unable to read metrics file for app %s. Reason - %s:%s',
app_id_name, type(e).__name__, e)
return metrics


@dataclass
class QualificationAsLocal(Qualification):
Expand Down
5 changes: 5 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ def get_rapids_output_folder(self) -> str:
return root_dir
return FSUtil.build_path(root_dir, rapids_subfolder)

def get_metrics_output_folder(self) -> str:
root_dir = self.get_rapids_output_folder()
metrics_subfolder = self.get_value('toolOutput', 'metricsSubFolder')
return FSUtil.build_path(root_dir, metrics_subfolder)

def get_log4j_properties_file(self) -> str:
return self.get_value_silent('toolOutput', 'textFormat', 'log4jFileName')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
toolOutput:
completeOutput: true
subFolder: rapids_4_spark_qualification_output
metricsSubFolder: raw_metrics
textFormat:
summaryLog:
fileName: rapids_4_spark_qualification_output.log
Expand Down Expand Up @@ -254,27 +255,50 @@ local:
speedupColumnName: 'Estimated GPU Speedup'
categoryColumnName: 'Estimated GPU Speedup Category'
heuristicsColumnName: 'Skip by Heuristics'
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
defaultCategory: 'Not Recommended'
strategies:
spark: # Spark specific speedup categories
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
photon: # Photon specific speedup categories
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.0
- title: 'Small'
lowerBound: 1.0
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.0
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
additionalHeuristics:
appInfo:
fileName: 'application_information.csv'
Expand Down
13 changes: 13 additions & 0 deletions user_tools/src/spark_rapids_tools/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,16 @@ def create_default_model_args(cls, model_type: str) -> dict:
'xgboostEnabled': model_type == QualEstimationModel.XGBOOST,
'customModelFile': None,
}


class AppExecutionType(EnumeratedType):
"""
Represents the execution type for the application (e.g. Spark, Photon or GPU).
"""
SPARK = 'spark'
PHOTON = 'photon'
GPU = 'gpu'

@classmethod
def get_default(cls) -> 'AppExecutionType':
return cls.SPARK
75 changes: 73 additions & 2 deletions user_tools/src/spark_rapids_tools/tools/speedup_category.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,83 @@

import pandas as pd

from spark_rapids_pytools.cloud_api.sp_types import PlatformBase
from spark_rapids_tools import CspEnv
from spark_rapids_tools.enums import AppExecutionType
from spark_rapids_tools.utils import Utilities


@dataclass
class SpeedupStrategy:
"""
Wrapper class for speedup strategy properties.
"""
_categories: list = field(default_factory=list, init=False)
_eligibility_conditions: list = field(default_factory=list, init=False)

def __init__(self, props: dict):
self._categories = props.get('categories', [])
self._eligibility_conditions = props.get('eligibilityConditions', [])

def get_categories(self) -> list:
return self._categories

def get_eligibility_conditions(self) -> list:
return self._eligibility_conditions


@dataclass
class SpeedupStrategyBuilder:
""""
Builder class for creating speedup strategy based on Spark properties.
TODO: This class can be extended to support different speedup strategies for a mixed set of application types.
"""

@classmethod
def build_speedup_strategy(cls,
platform: PlatformBase,
spark_properties: dict,
speedup_strategy_props: dict) -> SpeedupStrategy:
"""
Builds a SpeedupStrategy based on the provided Spark properties of the applications.
This function verifies that all applications belong to the same type and returns the appropriate strategy.

:param platform: Platform for which the speedup strategy is being built.
:param spark_properties: Dictionary of App IDs and corresponding Spark properties.
:param speedup_strategy_props: Dictionary containing the properties for different speedup strategies.
"""
# For non-Databricks platforms, return the default speedup strategy (i.e. Spark CPU based)
if platform.get_platform_name() not in [CspEnv.DATABRICKS_AWS, CspEnv.DATABRICKS_AZURE]:
default_app_type = AppExecutionType.get_default()
return SpeedupStrategy(speedup_strategy_props.get(default_app_type))

detected_app_type = set()
spark_version_key = 'spark.databricks.clusterUsageTags.sparkVersion'

# Detect the application type based on the Spark version
for spark_properties_df in spark_properties.values():
spark_props_dict = Utilities.convert_df_to_dict(spark_properties_df)
spark_version = spark_props_dict.get(spark_version_key, '').lower()
if AppExecutionType.PHOTON in spark_version:
detected_app_type.add(AppExecutionType.PHOTON)
else:
detected_app_type.add(AppExecutionType.get_default())

if len(detected_app_type) != 1:
app_types_str = ', '.join([app_type.value for app_type in detected_app_type])
raise ValueError(f'Expected applications of a single type but found a mix: {app_types_str}')

# Return the SpeedupStrategy based on the detected application type
return SpeedupStrategy(speedup_strategy_props.get(next(iter(detected_app_type))))


@dataclass
class SpeedupCategory:
"""
Encapsulates the logic to categorize the speedup values based on the range values.
"""
props: dict = field(default=None, init=True)
speedup_strategy: SpeedupStrategy = field(default=None, init=True)

def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Expand All @@ -44,7 +114,7 @@ def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
output: row_2 = pd.Series({'speedup': 3.5, 'speedup category': 'Large'})
reason: Speedup Category will be 'Large' because the speedup is within the range (3-100000).
"""
categories = self.props.get('categories')
categories = self.speedup_strategy.get_categories()
category_col_name = self.props.get('categoryColumnName')
speedup_col_name = self.props.get('speedupColumnName')

Expand Down Expand Up @@ -74,11 +144,12 @@ def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
output: row_2 = pd.Series({'criteriaCol1': 15, 'criteriaCol2': 85, 'speedup category': 'Not Recommended'})
reason: Category will be set to 'Not Recommended' because the criteriaCol1 is not within the range (18-30)
"""
eligibility_conditions = self.speedup_strategy.get_eligibility_conditions()
category_col_name = self.props.get('categoryColumnName')
heuristics_col_name = self.props.get('heuristicsColumnName')

def process_row(single_row: pd.Series) -> str:
for entry in self.props.get('eligibilityConditions'):
for entry in eligibility_conditions:
col_value = single_row[entry.get('columnName')]
# If the row is marked to be skipped by heuristics or the value is not within the range,
# set the category to default category (Not Recommended)
Expand Down
10 changes: 10 additions & 0 deletions user_tools/src/spark_rapids_tools/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,13 @@ def bytes_to_human_readable(cls, num_bytes: int) -> str:
num_bytes /= 1024.0
i += 1
return f'{num_bytes:.2f} {size_units[i]}'

@classmethod
def convert_df_to_dict(cls, df: pd.DataFrame) -> dict:
"""
Converts a DataFrame with exactly two columns into a dictionary. The first column is used as keys
and the second column as values.
"""
assert len(df.columns) == 2, 'Cannot convert DataFrame to dict, expected 2 columns'
key_col, value_col = df.columns[0], df.columns[1]
return df.set_index(key_col)[value_col].to_dict()
Loading