Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.qualification_core import QualificationCore
from spark_rapids_tools.api_v1.builder import APIResultHandler
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, SubmissionMode
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
Expand Down Expand Up @@ -537,12 +538,13 @@ def __update_apps_with_prediction_info(self,
model_name = self.ctxt.platform.get_prediction_model_name()
qual_output_dir = self.ctxt.get_csp_output_path()
output_info = self.__build_prediction_output_files_info()
qual_handler = self.ctxt.get_ctxt('qualHandler')
try:
# Build the QualCore handler object to handle the prediction model output
q_core_handler = APIResultHandler().qual_core().with_path(qual_output_dir).build()
predictions_df = predict(platform=model_name, qual=qual_output_dir,
output_info=output_info,
model=estimation_model_args['customModelFile'],
qual_handlers=[qual_handler])
qual_handlers=[q_core_handler])
except Exception as e: # pylint: disable=broad-except
predictions_df = pd.DataFrame()
self.logger.error(
Expand Down
22 changes: 7 additions & 15 deletions user_tools/src/spark_rapids_pytools/rapids/qualx/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,25 @@

from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.rapids.qualx.qualx_tool import QualXTool
from spark_rapids_tools.tools.core.qual_handler import QualCoreHandler
from spark_rapids_tools.api_v1 import QualCoreResultHandler
from spark_rapids_tools.api_v1.builder import APIResultHandler
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.qualx.util import print_summary, print_speedup_summary


@dataclass
class Prediction(QualXTool):
"""
Wrapper layer around Prediction Tool.

Attributes
----------
qual_output: str
Path to a directory containing qualification tool output.
qual_handler: QualCoreHandler
Handler for reading qualification core tool results.
A wrapper to run the QualX prediction stage on an existing Qual's output.
:param qual_output: Path to the directory containing the qualification tool output.
"""
qual_output: str = None
qual_handler: QualCoreHandler = None

name = 'prediction'

def __post_init__(self):
"""Initialize the QualCoreHandler from qual_output."""
super().__post_init__()
if self.qual_output is not None:
self.qual_handler = QualCoreHandler(result_path=self.qual_output)
@property
def qual_handler(self) -> QualCoreResultHandler:
return APIResultHandler().qual_core().with_path(self.qual_output).build()

def __prepare_prediction_output_info(self) -> dict:
"""
Expand Down
30 changes: 25 additions & 5 deletions user_tools/src/spark_rapids_tools/api_v1/app_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from dataclasses import dataclass, field
from functools import cached_property
from typing import Optional
from typing import Optional, List

import pandas as pd

Expand Down Expand Up @@ -57,17 +57,37 @@ def uuid(self) -> str:
"""
return self._app_id

def patch_into_df(self, df: pd.DataFrame) -> pd.DataFrame:
def patch_into_df(self,
df: pd.DataFrame,
col_names: Optional[List[str]] = None) -> pd.DataFrame:
"""
Given a dataframe, this method will stitch the app_id and app-name to the dataframe.
This can be useful in automatically adding the app-id/app-name to the data-frame
:param df: the dataframe that we want to modify.
:param col_names: optional list of column names that defines the app_id and app_name to the
dataframe. It is assumed that the list comes in the order it is inserted in
the column names.
:return: the resulting dataframe from adding the columns.
"""
# TODO: We should consider add UUID as well, and use that for the joins instead.
# append attempt_id to support multiple attempts
col_values = [self.app_id]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels weird that col_values is a fixed/default list of one value, while col_names can be a user-provided list with multiple values. Also, I'm not sure how I would patch app_name per the docstring. And what would happen if I passed in col_names=['one', 'two', 'three']? Seems like I'd get three columns with the same (appId) value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
I believe that the dev intending to stitch DFs using Apps (default combiner), has to use a column-name that can be mapped to the object. Otherwise, he has to use a custom combiner for example:

  • [appId, app_id, App ID, App Id...] -> all those variations can be reduced to app._app_id field. Otherwise, there won't be a way to tell which col-name will get the value of the app._app_id
  • [app_name, appName, App Name...] ->similarly all can be mapped to app._app_name fields

if col_names is None:
# append attemptId to support multi-attempts
col_names = ['appId']
Copy link

Copilot AI Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zip operation will only iterate over min(len(col_names), len(col_values)) items. Since col_values always has length 1 but col_names can have different lengths, this could skip columns or fail silently.

Suggested change
col_names = ['appId']
col_names = ['appId']
# Ensure col_values matches col_names in length
if len(col_values) == 1 and len(col_names) > 1:
col_values = col_values * len(col_names)
elif len(col_values) != len(col_names):
raise ValueError("Length of col_values must be 1 or match length of col_names")

Copilot uses AI. Check for mistakes.
# Ensure col_values matches col_names in length
if len(col_values) == 1 and len(col_names) > 1:
col_values = col_values * len(col_names)
elif len(col_values) != len(col_names):
raise ValueError('Length of col_values must be 1 or match length of col_names')
if not df.empty:
# TODO: We should consider add UUID as well, and use that for the joins instead.
df.insert(0, 'attemptId', self._attempt_id)
df.insert(0, 'appId', self._app_id)
for col_k, col_v in zip(reversed(col_names), reversed(col_values)):
if col_k not in df.columns:
df.insert(0, col_k, col_v)
else:
# if the column already exists, we should not overwrite it
# this is useful when we want to patch the app_id/app_name to an existing dataframe
df[col_k] = col_v
return df

@property
Expand Down
134 changes: 134 additions & 0 deletions user_tools/src/spark_rapids_tools/api_v1/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ def rep_reader(self) -> 'ToolReportReaderT':
raise ValueError(f'No reader found for table: {self._tbl}')
return reader

@property
def tbl(self) -> str:
"""Get the table id."""
return self._tbl

@property
def is_per_app_tbl(self) -> bool:
if self._tbl is None:
Expand Down Expand Up @@ -206,6 +211,135 @@ def _load_single_app(self) -> LoadDFResult:
)


@dataclass
class CSVCombiner(object):
"""A class that combines multiple CSV reports into a single report."""
rep_builder: CSVReport
_failed_app_processor: Optional[Callable[[str, LoadDFResult], None]] = field(default=None, init=False)
_success_app_processor: Optional[Callable[[ToolResultHandlerT, str, pd.DataFrame, dict], pd.DataFrame]] = (
field(default=None, init=False))
_combine_args: Optional[dict] = field(default=None, init=False)

@property
def result_handler(self) -> ToolResultHandlerT:
"""Get the result handler associated with this combiner."""
return self.rep_builder.handler

@staticmethod
def default_success_app_processor(result_handler: ToolResultHandlerT,
app_id: str,
df: pd.DataFrame,
combine_args: dict) -> pd.DataFrame:
"""Default processor for successful applications."""
col_names = None
app_entry = result_handler.app_handlers.get(app_id)
if not app_entry:
raise ValueError(f'App entry not found for ID: {app_id}')
if combine_args:
# check if the col_names are provided to stitch the app_ids
col_names = combine_args.get('col_names', None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we should rename col_names here, since it sounds like a "selection" list, but it's actually a "patch" list.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

if col_names:
# patch the app_uuid and if the columns are defined.
return app_entry.patch_into_df(df, col_names=col_names)
return df

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably add a default_failed_app_processor which just logs something like "failed to combine: {app_id}". This would provide a useful example (and indirect documentation) for the callback API, e.g. the str argument will be the appId.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion.
We can also consider dumping them in a single log-message in order to improve readability of the logs, right?

def _evaluate_args(self) -> None:
"""Evaluate the arguments to ensure they are set correctly."""

if self._success_app_processor is None:
# set the default processor for successful applications
self._success_app_processor = self.default_success_app_processor
# TODO: we should fail if the the combiner is built for AppIds but columns are not defined.

def _create_empty_df(self) -> pd.DataFrame:
"""
creates an empty DataFrame with the columns defined in the report builder.
:return: an empty dataframe.
"""
empty_df = self.result_handler.create_empty_df(self.rep_builder.tbl)
if self._combine_args and 'use_cols' in self._combine_args:
# make sure that we insert the columns to the empty dataframe
injected_cols = pd.DataFrame(columns=self._combine_args['use_cols'])
return pd.concat([injected_cols, empty_df], axis=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also add columns from 'col_names' as well (or is that a subset of 'use_cols')?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this was a typo. it should be col_names.
However, this reminded me that the empty df should account if the CSVReport had arguments to rename-cols.

return empty_df

################################
# Setters/Getters for processors
################################

def process_failed(self,
processor: Callable[[str, LoadDFResult], None]) -> 'CSVCombiner':
"""Set the processor for failed applications."""
self._failed_app_processor = processor
return self

def process_success(self,
cb_fn: Callable[[ToolResultHandlerT, str, pd.DataFrame, dict], pd.DataFrame]) -> 'CSVCombiner':
"""Set the processor for successful applications."""
self._success_app_processor = cb_fn
return self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, I'd settle on either processor or cb_fn arg for both of these, unless there was a good reason for this distinction.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point.


def combine_args(self, args: dict) -> 'CSVCombiner':
"""Set the arguments for combining the reports."""
self._combine_args = args
return self

def on_apps(self) -> 'CSVCombiner':
"""specify that the combiner inject AP UUID to the individual results before the concatenation."""
self.process_success(self.default_success_app_processor)
return self

#########################
# Public Interfaces
#########################

def build(self) -> LoadDFResult:
"""Build the combined CSV report."""
# process teh arguments to ensure they are set correctly
self._evaluate_args()

load_error = None
final_df = None
success = False
try:
per_app_res = self.rep_builder.load()
# this is a dictionary and we should loop on it one by one to combine it
combined_dfs = []
for app_id, app_res in per_app_res.items():
# we need to patch the app_id to the dataframe
if app_res.load_error or app_res.data.empty:
# process entry with failed results or skip them if no handlder is defined.
if self._failed_app_processor:
self._failed_app_processor(app_id, app_res)
else:
# default behavior is to skip the app
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a default failed_app_processor, then can simplify this block.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can failed_app_processor raise an exception? Might want to document expected behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great attention to details +1

else:
# process entry with successful results
app_df = self._success_app_processor(self.result_handler,
app_id,
app_res.data,
self._combine_args)
# Q: Should we ignore or skip the empty dataframes?
combined_dfs.append(app_df)
if combined_dfs:
# only concatenate if we have any dataframes to combine
final_df = pd.concat(combined_dfs, ignore_index=True)
else:
# create an empty DataFrame if no data was collected. uses the table schema.
final_df = self._create_empty_df()
success = True
except Exception as e: # pylint: disable=broad-except
# handle any exceptions that occur during the combination phase
load_error = e
return LoadDFResult(
f_path='combination of multiple path for table: ' + self.rep_builder.tbl,
data=final_df,
success=success,
fallen_back=False,
load_error=load_error)


@dataclass
class JPropsReport(APIReport[JPropsResult]):
"""A report that loads data in JSON properties format."""
Expand Down
5 changes: 5 additions & 0 deletions user_tools/src/spark_rapids_tools/api_v1/core/qual_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
"""Module that contains the definition of the qualification Result handler for the core module."""

from dataclasses import dataclass
from typing import Optional

from spark_rapids_tools import override
from spark_rapids_tools.api_v1 import register_result_class, ResultHandler
from spark_rapids_tools.api_v1.report_reader import ToolReportReader
from spark_rapids_tools.storagelib.cspfs import BoundedCspPath


@register_result_class('qualCoreOutput')
Expand All @@ -29,3 +31,6 @@ class QualCoreResultHandler(ResultHandler):
@property
def alpha_reader(self) -> ToolReportReader:
return self.readers.get(self.report_id)

def get_raw_metrics_path(self) -> Optional[BoundedCspPath]:
return self.get_reader_path('coreRawMetrics')
22 changes: 22 additions & 0 deletions user_tools/src/spark_rapids_tools/api_v1/result_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@ def get_reader_by_tbl(self, tbl: str) -> Optional[ToolReportReader]:
# Public Interfaces
#########################

def get_reader_path(self, report_id: str) -> Optional[BoundedCspPath]:
"""
Get the path to the report file for the given report ID.
:param report_id: The unique identifier for the report.
:return: The path to the report file, or None if not found.
"""
reader = self.readers.get(report_id)
if reader:
return reader.out_path
return None

def create_empty_df(self, tbl: str) -> pd.DataFrame:
"""
Create an empty DataFrame for the given table label.
Expand All @@ -153,6 +164,17 @@ def get_table_path(self, table_label: str) -> Optional[BoundedCspPath]:
return reader.get_table_path(table_label)
return None

def is_empty(self) -> bool:
"""
Check if the result handler has no data.
:return: True if the result handler is empty, False otherwise.
"""
# first check that the output file exists
if not self.out_path.exists():
return True
# then check that the app_handlers are empty
return not self.app_handlers

#########################
# Type Definitions
#########################
Expand Down
12 changes: 6 additions & 6 deletions user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,14 +460,14 @@ def _is_ignore_no_perf(action: str) -> bool:
| node_level_supp['Action'].apply(_is_ignore_no_perf)
| node_level_supp['Exec Name']
.astype(str)
# TODO: revisit the need to check for 'WholeStageCodegen' in Exec Name.
# Ideally, we want to remove those execs that should be dropped from the analysis (
# e.g. WholeStageCodegen, WholeStageCodegenExec, etc.)
.apply(lambda x: x.startswith('WholeStageCodegen'))
)
node_level_supp = (
node_level_supp[['App ID', 'SQL ID', 'SQL Node Id', 'Exec Is Supported']]
.groupby(['App ID', 'SQL ID', 'SQL Node Id'])
.agg('all')
.reset_index(level=[0, 1, 2])
)
# in previous version we used to group by 'App ID', 'SQL ID', 'SQL Node Id', but this is not
# needed since the 3 keys form an uuid for each row.
node_level_supp = node_level_supp[['App ID', 'SQL ID', 'SQL Node Id', 'Exec Is Supported']]
return node_level_supp


Expand Down
Loading