Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add qualification support for Photon jobs in the Python Tool #1409

Draft
wants to merge 6 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 81 additions & 6 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, CspEnv, ExecutionEngine
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
Expand Down Expand Up @@ -289,12 +289,12 @@ def __build_global_report_summary(self,
total_apps: pd.DataFrame,
unsupported_ops_df: pd.DataFrame,
output_files_info: JSONPropertiesContainer) -> QualificationSummary:
# TODO: This method does a lot of critical but unrelated work. Refactor this into smaller steps/methods
# to improve readability and maintainability.
if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary(total_apps=total_apps, tools_processed_apps=all_apps)

unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Generate the statistics report
try:
stats_report = SparkQualificationStats(ctxt=self.ctxt)
Expand All @@ -303,37 +303,52 @@ def __build_global_report_summary(self,
self.logger.error('Failed to generate the statistics report: %s', e)

# Calculate unsupported operators stage duration before grouping
unsupported_ops_obj = UnsupportedOpsStageDuration(
self.ctxt.get_value('local', 'output', 'unsupportedOperators'))
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)

# Apply additional heuristics to skip apps not suitable for GPU acceleration
heuristics_ob = AdditionalHeuristics(
props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'),
tools_output_dir=self.ctxt.get_rapids_output_folder(),
output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path'))
apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df)
speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories'))

# Group the applications and recalculate metrics
apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df)
df_final_result = speedup_category_ob.build_category_column(apps_grouped_df)

# Assign execution engine (Spark/Photon etc.) and speedup categories (Small/Medium/Large) to each application.
# Note: Strategy for speedup categorization will be based on the execution engine of the application.
apps_with_exec_engine_df = self._assign_execution_engine_to_apps(apps_grouped_df)
speedup_category_confs = self.ctxt.get_value('local', 'output', 'speedupCategories')
speedup_category_ob = SpeedupCategory(speedup_category_confs)
df_final_result = speedup_category_ob.build_category_column(apps_with_exec_engine_df)

# Generate the cluster shape report
reshaped_notes = self.__generate_cluster_shape_report()
report_comments = [group_notes] if group_notes else []
if reshaped_notes:
report_comments.append(reshaped_notes)

# Write the final result to the output file
csv_out = output_files_info.get_value('summary', 'path')
if not df_final_result.empty:
self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out)
df_final_result.to_csv(csv_out, float_format='%.2f')

# Add columns for cluster configuration recommendations and tuning configurations to the processed_apps.
recommender = ClusterConfigRecommender(self.ctxt)
df_final_result = recommender.add_cluster_and_tuning_recommendations(df_final_result)
# Merge the total_apps with the processed_apps to get the Event Log
df_final_result = pd.merge(df_final_result, total_apps[['Event Log', 'AppID']],
left_on='App ID', right_on='AppID')

# Write the app metadata
app_metadata_info = output_files_info.get_value('appMetadata')
config_recommendations_info = output_files_info.get_value('configRecommendations')
self._write_app_metadata(df_final_result, app_metadata_info, config_recommendations_info)

# Return the summary
return QualificationSummary(total_apps=total_apps,
tools_processed_apps=df_final_result,
comments=report_comments)
Expand Down Expand Up @@ -595,6 +610,66 @@ def _read_qualification_output_file(self, report_name_key: str, file_format_key:
report_file_path = FSUtil.build_path(self.ctxt.get_rapids_output_folder(), report_file_name)
return pd.read_csv(report_file_path)

def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFrame]:
"""
Helper method to read metric files from the qualification tool's output metric folder.
Returns a dictionary of DataFrames, where each key is an application ID, and each
DataFrame contains the corresponding application's metrics data.
Example:
{
'appId1': pd.DataFrame(...),
'appId2': pd.DataFrame(...),
}
:param file_name: Name of the metric file to read from each application's folder
"""
metrics = {}
root_metric_dir = self.ctxt.get_metrics_output_folder()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Rather than using the legacy FSUtil, it might help to write this method using the storageLib.
This assures that we do not have to revisit this method when we support distributed output.
I don't have strong opinion on changing the implementation though.

for metric_dir in FSUtil.get_subdirectories(root_metric_dir):
app_id_str = FSUtil.get_resource_name(metric_dir)
report_file_path = FSUtil.build_path(metric_dir, file_name)
try:
metrics[app_id_str] = pd.read_csv(report_file_path)
except Exception as e: # pylint: disable=broad-except
# Some apps may not have the given metrics file, we should ensure
# that the dictionary contains entries for all apps to avoid KeyErrors
# and maintain consistency in processing.
metrics[app_id_str] = pd.DataFrame()
self.logger.warning('Unable to read metrics file for app %s. Reason - %s:%s',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: at some point we need to find a better way to log those warning messages. When I run the tools, there is a ton of messages triggered by missing some of the profiler's output.
We could do improve that in a separate issue. We can buffer all those missing files and just dump a single warning message.

app_id_str, type(e).__name__, e)
return metrics

def _assign_execution_engine_to_apps(self, tools_processed_apps: pd.DataFrame) -> pd.DataFrame:
"""
Assigns the execution engine (Spark/Photon) to each application. This will be used to categorize
applications into speedup categories (Small/Medium/Large).
"""
spark_properties = self._read_qualification_metric_file('spark_properties.csv')
default_exec_engine_type = ExecutionEngine.get_default()
exec_engine_col_name = self.ctxt.get_value('local', 'output', 'speedupCategories', 'execEngineColumnName')

# Default to Spark-based execution type for non-Databricks platforms
if self.ctxt.platform.get_platform_name() not in {CspEnv.DATABRICKS_AWS, CspEnv.DATABRICKS_AZURE}:
tools_processed_apps[exec_engine_col_name] = default_exec_engine_type
Comment on lines +651 to +652
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a function in the platform. DB-AWS/DB-azure can override it.
Later, we might have some logic to apply for the onprem.
For example, a customer running a custom Spark (onprem), then we will need to find a way to specify this "executionEngine".

return tools_processed_apps

# Create a map of App IDs to their execution engine type (Spark/Photon)
spark_version_key = 'spark.databricks.clusterUsageTags.sparkVersion'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here..it is a platform specific that should not be defined by the Qual tool.

exec_engine_map = {}

for app_id, props_df in spark_properties.items():
props_dict = Utilities.convert_df_to_dict(props_df)
spark_version = props_dict.get(spark_version_key, '').lower()
if ExecutionEngine.PHOTON.lower() in spark_version:
exec_engine_map[app_id] = ExecutionEngine.PHOTON
else:
exec_engine_map[app_id] = default_exec_engine_type

# Assign the execution engine type to each application DataFrame row
tools_processed_apps[exec_engine_col_name] = (
tools_processed_apps['App ID'].map(exec_engine_map).fillna(default_exec_engine_type)
)
return tools_processed_apps


@dataclass
class QualificationAsLocal(Qualification):
Expand Down
5 changes: 5 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ def get_rapids_output_folder(self) -> str:
return root_dir
return FSUtil.build_path(root_dir, rapids_subfolder)

def get_metrics_output_folder(self) -> str:
root_dir = self.get_rapids_output_folder()
metrics_subfolder = self.get_value('toolOutput', 'metricsSubFolder')
return FSUtil.build_path(root_dir, metrics_subfolder)

def get_log4j_properties_file(self) -> str:
return self.get_value_silent('toolOutput', 'textFormat', 'log4jFileName')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
toolOutput:
completeOutput: true
subFolder: rapids_4_spark_qualification_output
metricsSubFolder: raw_metrics
textFormat:
summaryLog:
fileName: rapids_4_spark_qualification_output.log
Expand Down Expand Up @@ -148,6 +149,7 @@ local:
- 'App Name'
- 'Event Log'
- 'Cluster Info'
- 'Execution Engine'
- 'Estimated GPU Speedup Category'
- 'Full Cluster Config Recommendations*'
- 'GPU Config Recommendation Breakdown*'
Expand Down Expand Up @@ -254,27 +256,51 @@ local:
speedupColumnName: 'Estimated GPU Speedup'
categoryColumnName: 'Estimated GPU Speedup Category'
heuristicsColumnName: 'Skip by Heuristics'
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
execEngineColumnName: 'Execution Engine'
defaultCategory: 'Not Recommended'
strategies:
spark: # Spark specific speedup categories
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
photon: # Photon specific speedup categories
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.0
- title: 'Small'
lowerBound: 1.0
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.0
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some thinking on the impact of design.
This introduces a platform configuration inside the tool's conf. On the other hand, we do have a configuration file per platform.

additionalHeuristics:
appInfo:
fileName: 'application_information.csv'
Expand Down
12 changes: 12 additions & 0 deletions user_tools/src/spark_rapids_tools/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,15 @@ def create_default_model_args(cls, model_type: str) -> dict:
'xgboostEnabled': model_type == QualEstimationModel.XGBOOST,
'customModelFile': None,
}


class ExecutionEngine(EnumeratedType):
"""
Represents the execution engine for the application (Spark or Photon).
"""
SPARK = 'spark'
PHOTON = 'photon'

@classmethod
def get_default(cls) -> 'ExecutionEngine':
return cls.SPARK
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a class defined per platform? All the platforms will be bound to the default enumType. Then DB extends that definition adding the photon type.

51 changes: 42 additions & 9 deletions user_tools/src/spark_rapids_tools/tools/speedup_category.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,44 @@
"""Implementation class for Speedup Category logic."""

from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, Dict

import pandas as pd


class SpeedupStrategy:
"""
Wrapper class for speedup strategy properties.
"""
_categories: list
_eligibility_conditions: list

def __init__(self, props: dict):
self._categories = props.get('categories', [])
self._eligibility_conditions = props.get('eligibilityConditions', [])

def get_categories(self) -> list:
return self._categories

def get_eligibility_conditions(self) -> list:
return self._eligibility_conditions


@dataclass
class SpeedupCategory:
"""
Encapsulates the logic to categorize the speedup values based on the range values.
"""
props: dict = field(default=None, init=True)
speedup_strategies: Dict[str, SpeedupStrategy] = field(default_factory=dict, init=False)

def __post_init__(self):
strategy_properties = self.props.get('strategies', {})
# Create a SpeedupStrategy for each execution engine type.
for exec_engine, properties in strategy_properties.items(): # type: str, dict
self.speedup_strategies[exec_engine] = SpeedupStrategy(properties)

def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
def _build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Build the category column based on the range values of the speedup column.
Example:
Expand All @@ -44,20 +69,24 @@ def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
output: row_2 = pd.Series({'speedup': 3.5, 'speedup category': 'Large'})
reason: Speedup Category will be 'Large' because the speedup is within the range (3-100000).
"""
categories = self.props.get('categories')
category_col_name = self.props.get('categoryColumnName')
speedup_col_name = self.props.get('speedupColumnName')
exec_engine_col_name = self.props.get('execEngineColumnName')

# Calculate the category based on the speedup value
def calculate_category(col_value) -> Optional[str]:
def calculate_category(single_row: pd.Series) -> Optional[str]:
exec_engine = single_row.get(exec_engine_col_name)
# Get the speedup strategy and its categories for the given execution engine type.
categories = self.speedup_strategies.get(exec_engine).get_categories()
col_value = single_row.get(speedup_col_name)
for category in categories:
if category.get('lowerBound') <= col_value < category.get('upperBound'):
return category.get('title')
return None
all_apps[category_col_name] = all_apps[speedup_col_name].apply(calculate_category)
all_apps[category_col_name] = all_apps.apply(calculate_category, axis=1)
return all_apps

def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
def _process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Process the speedup category column based on the eligibility criteria. If the row does not match
the criteria, the category column will be set to the `Not Recommended` category.
Expand All @@ -76,9 +105,13 @@ def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
category_col_name = self.props.get('categoryColumnName')
heuristics_col_name = self.props.get('heuristicsColumnName')
exec_engine_col_name = self.props.get('execEngineColumnName')

def process_row(single_row: pd.Series) -> str:
for entry in self.props.get('eligibilityConditions'):
exec_engine = single_row.get(exec_engine_col_name)
# Get the speedup strategy and its eligibility conditions for the given execution engine type.
eligibility_conditions = self.speedup_strategies.get(exec_engine).get_eligibility_conditions()
for entry in eligibility_conditions:
col_value = single_row[entry.get('columnName')]
# If the row is marked to be skipped by heuristics or the value is not within the range,
# set the category to default category (Not Recommended)
Expand All @@ -91,6 +124,6 @@ def process_row(single_row: pd.Series) -> str:
return all_apps

def build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
apps_with_category = self.__build_category_column(all_apps)
processed_apps = self.__process_category(apps_with_category)
apps_with_category = self._build_category_column(all_apps)
processed_apps = self._process_category(apps_with_category)
return processed_apps
10 changes: 10 additions & 0 deletions user_tools/src/spark_rapids_tools/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,13 @@ def bytes_to_human_readable(cls, num_bytes: int) -> str:
num_bytes /= 1024.0
i += 1
return f'{num_bytes:.2f} {size_units[i]}'

@classmethod
def convert_df_to_dict(cls, df: pd.DataFrame) -> dict:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I don't think this can really be a util function because it is very tailored to specific use-case. Perhaps if the method is more generic to return a dictionary of {key -> {col1: val, col2: val,...}}, then this would be a utility function.
  • If we want to keep it, we can consider moving this method to a separate util class/file. for example df_ustils.py to be the seed for any other helpers that we need for the dataframes.

"""
Converts a DataFrame with exactly two columns into a dictionary. The first column is used as keys
and the second column as values.
"""
assert len(df.columns) == 2, 'Cannot convert DataFrame to dict, expected 2 columns'
key_col, value_col = df.columns[0], df.columns[1]
return df.set_index(key_col)[value_col].to_dict()
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,16 @@ Feature: Event Log Processing
Qualification. Raised an error in phase [Execution]
"""
And return code is "1"

@test_id_ELP_0003
Scenario Outline: Qualification tool processes event logs with different execution engine
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: execution engines

Q: what is the behavior if a user input --platform onprem, but with a photon event log? Does this fail in python side early?

Given platform is "<platform>"
When spark-rapids tool is executed with "<event_logs>" eventlogs
Then "app_metadata.json" contains execution engine as "<execution_engines>"
And return code is "0"

Examples:
| platform | event_logs | execution_engines |
| databricks-aws | join_agg_on_yarn_eventlog.zstd | spark |
| databricks-aws | join_agg_on_yarn_eventlog.zstd,photon_eventlog.zstd | spark;photon |
| dataproc | join_agg_on_yarn_eventlog.zstd | spark |
Loading