diff --git a/.gitignore b/.gitignore index bf6ed52..b7513f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ results/* !results/*.gitkeep +.DS_Store +.idea diff --git a/README.md b/README.md index 9b4e7a3..d6cce84 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,17 @@ ### Run Sizey -1. Create a Python virtual environment and install the dependencies -2. Run `python3 main.py filename alpha softmax error_metric seed` +1. Create a Python virtual environment and install the dependencies +2. Run `python3 main.py filename alpha softmax error_metric seed [--use_online_grid]` + +Where: - `filename` describes the workflow from the data folder. For instance `./data/trace_methylseq.csv` - `alpha` sets the alpha you want to execute Sizey with. It has to be between 0.0 and 1.0 -- `interpolation` actives the interpolation strategy. It is either False or True. If set to False, the Argmax strategy is used. +- `softmax` toggles the softmax ensemble strategy. Set to `True` to use it, otherwise `False` for the argmax strategy. - `error_metric` defines the XYZ used for ABC. Currently, it is either `smoothed_mape` or `neg_mean_squared_error` whereas `smoothed_mape` should be used and other error metrics might be experimental and change the impact on the RAQ score. - `seed` defines the seed for splitting up the initial data in training and test data and also defines the order of online task input. +- `--use_online_grid` (optional) toggles the online grid search. Here is an example command: `./data/trace_methylseq.csv 0.0 True smoothed_mape 1996` diff --git a/approach/abstract_predictor.py b/approach/abstract_predictor.py index a67c1fe..054d958 100644 --- a/approach/abstract_predictor.py +++ b/approach/abstract_predictor.py @@ -6,8 +6,33 @@ class PredictionModel(metaclass=ABCMeta): + """ + PredictionModel is an abstract base class for implementing machine learning models for regression tasks. + + This class provides a structure for defining methods related to model training, prediction, and updating. + It includes attributes for storing training data, scalers, and error metrics, and requires subclasses to + implement specific methods for model functionality. + + Attributes: + workflow_name (str): Name of the workflow this model is associated with. + task_name (str): Name of the task this model is associated with. + err_metr (str): Error metric used for model evaluation (e.g., 'smoothed_mape'). + regressor (Optional[object]): The machine learning model used for predictions. + train_X_scaler (Optional[object]): Scaler for normalizing training features. + train_y_scaler (Optional[object]): Scaler for normalizing training labels. + X_train_full (Optional[np.ndarray]): Historical training features. + y_train_full (Optional[np.ndarray]): Historical training labels. + model_error (Optional[float]): Error score of the model. + """ def __init__(self, workflow_name: str, task_name: str, err_metr: str): + """ + Initializes the PredictionModel with workflow name, task name, and error metric. + + :param workflow_name: Name of the workflow this model is associated with. + :param task_name: Name of the task this model is associated with. + :param err_metr: Error metric to be used for model evaluation (e.g., 'smoothed_mape'). + """ self.workflow_name = workflow_name self.task_name = task_name self.err_metr = err_metr @@ -18,16 +43,54 @@ def __init__(self, workflow_name: str, task_name: str, err_metr: str): self.y_train_full = None self.model_error = None + def _ensure_column_vector(self, array_like: Union[pd.Series, np.ndarray, list]) -> np.ndarray: + """ + Converts the input array-like object into a numpy array shaped as a column vector. + + This method ensures that the input, whether it is a pandas Series, a 1-D numpy array, or a list, + is transformed into a two-dimensional numpy array with a single column. This format is required + by scikit-learn for certain operations. + + :param array_like: Input data to be converted. Can be a pandas Series, numpy array, or list. + :return: A numpy array reshaped into a column vector (2-D array with shape (n, 1)). + """ + arr = np.asarray(array_like) + if arr.ndim == 1: + arr = arr.reshape(-1, 1) + return arr + def initial_model_training(self, X_train, y_train) -> None: + """ + Initializes the model with training data. + + :param X_train: Training features. + :param y_train: Training labels. + """ raise NotImplementedError('Model prediction method has not been implemented.') def predict_task(self, task_features: pd.Series) -> np.ndarray: + """ + Predicts the output for a single task based on its features. + + :param task_features: Features of the task to predict. + """ raise NotImplementedError('Model prediction method has not been implemented.') def predict_tasks(self, taskDataframe: pd.DataFrame) -> float: + """ + Predicts the output for multiple tasks based on their features. + + :param taskDataframe: DataFrame containing features of multiple tasks. + """ raise NotImplementedError('Predicting multiple tasks has not been implemented.') def update_model(self, X_train: pd.Series, y_train: float) -> None: + """ + Updates the model with new training data. + + :param X_train: New training features. + :param y_train: New training labels. + """ raise NotImplementedError('Model update method has not been implemented.') @@ -35,13 +98,40 @@ def update_model(self, X_train: pd.Series, y_train: float) -> None: class PredictionMethod(metaclass=ABCMeta): def predict(self, X_test, y_test, user_estimate): + """ + Predicts the output for given test data and user estimate. + + :param X_test: Test features. + :param y_test: Test labels. + :param user_estimate: User's estimate for the task. + """ raise NotImplementedError('Model prediction method has not been implemented.') def update_model(self, X_train: pd.Series, y_train: float): + """ + Updates the model with new training data. + + :param X_train: New training features. + :param y_train: New training labels. + """ raise NotImplementedError('Model prediction method has not been implemented.') def handle_underprediction(self, input_size: float, predicted: float, user_estimate: float, retry_number: int, actual_memory: float): + """ + Handles underprediction scenarios by adjusting the predicted value. + + :param input_size: Size of the input task. + :param predicted: Predicted value from the model. + :param user_estimate: User's estimate for the task. + :param retry_number: Number of retries attempted. + :param actual_memory: Actual memory used for the task. + """ raise NotImplementedError('Model prediction method has not been implemented.') def get_number_subModels(self) -> dict[str, int]: + """ + Returns the number of sub-models used in the prediction method. + + :return: Dictionary with model names as keys and their counts as values. + """ raise NotImplementedError('Model prediction method has not been implemented.') diff --git a/approach/experiment_constants.py b/approach/experiment_constants.py index fe52bd7..03a1ccd 100644 --- a/approach/experiment_constants.py +++ b/approach/experiment_constants.py @@ -2,11 +2,28 @@ class ERROR_STRATEGY(Enum): + """ + Defines the strategy for handling errors in the experiment. + + Attributes: + DOUBLE (int): Strategy to double the error value. + MAX_EVER_OBSERVED (int): Strategy to use the maximum error value ever observed. + """ DOUBLE = 1 MAX_EVER_OBSERVED = 2 class OFFSET_STRATEGY(Enum): + """ + Defines the strategy for handling offsets in the experiment. + + Attributes: + STD (int): Strategy to use the standard deviation. + MED_UNDER (int): Strategy to use the median of a subset of data. + MED_ALL (int): Strategy to use the median of all data. + STDUNDER (int): Strategy to use a modified standard deviation. + DYNAMIC (int): Strategy to dynamically adjust the offset. + """ STD = 1 MED_UNDER = 2 MED_ALL = 5 diff --git a/approach/helper.py b/approach/helper.py index 95fd979..a58bc08 100644 --- a/approach/helper.py +++ b/approach/helper.py @@ -1,42 +1,92 @@ import csv import os +import logging +from typing import Iterable, List + + +def log_if_verbose(msg: str) -> None: + """Log a debug message if logging is configured for it.""" + logging.debug(msg) + + +def nth_deltas(values: Iterable[float], step: int, limit: int) -> List[float]: + """Return ``step``-lagged deltas over the last ``limit`` elements.""" + vals = list(values) + deltas: List[float] = [] + start = max(0, len(vals) - limit - step) + for i in range(start, len(vals) - step): + deltas.append(vals[i + step] - vals[i]) + return deltas def write_result_to_csv(method_name: str, error_strategy: str, offset_strategy: str, taskname: str, - wastage_in_bytes: str, - wastage_mb: str, wastage_gb: str, wastage_mbh: str, wastage_gbh: str, failures: int, + wastage_gb: str, wastage_gbh: str, failures: int, runtimes_task: float, number_test: int, workflow: str, runtime_exp: float, maq: float, alpha: float, use_softmax: bool, - models: dict[str, int], error_metric: str, accuracy: str, seed: int): - if not (os.path.exists(get_file_path(workflow, alpha, use_softmax, error_metric, seed))): - with open(get_file_path(workflow, alpha, use_softmax, error_metric,seed), 'a', newline='\n') as csvfile: + models: dict[str, int], error_metric: str, accuracy: str, seed: int, use_online_grid: bool): + """ + Writes the results of a workflow execution to a CSV file. + + :param method_name: Name of the method used. + :param error_strategy: Strategy used for error handling. + :param offset_strategy: Strategy used for offset handling. + :param taskname: Name of the task. + :param wastage_gb: Wastage in gigabytes. + :param wastage_gbh: Wastage in gigabytes per hour. + :param failures: Number of failures encountered. + :param runtimes_task: Total runtime of the tasks. + :param number_test: Number of tests conducted. + :param workflow: Name of the workflow. + :param runtime_exp: Experimental runtime. + :param maq: Mean Absolute Quantile. + :param alpha: Alpha value used in the experiment. + :param use_softmax: Boolean indicating whether softmax was used. + :param models: Dictionary containing the number of sub-models used. + :param error_metric: Error metric used for evaluation. + :param accuracy: Accuracy of the method. + :param seed: Random seed used for reproducibility. + """ + if not (os.path.exists(get_file_path(workflow, alpha, use_softmax, error_metric, seed, use_online_grid))): + with open(get_file_path(workflow, alpha, use_softmax, error_metric,seed, use_online_grid), 'a', newline='\n') as csvfile: writer = csv.writer(csvfile, delimiter=',') writer.writerow( - ["Method", "Error_Strategy", "Offset_Strategy", "Task_Name", "Wastage_Bytes", "Wastage_MB", - "Wastage_GB", "Wastage_MBh", "Wastage_GBh", "Failures", "Runtime_Tasks", "Number_Tests", "Workflow", - "RuntimeExp", "MAQ", "Alpha", "Use_softmax", "Models", "Error_Metric", "Accuracy", "Seed"]) + ["Method", "Error_Strategy", "Offset_Strategy", "Task_Name", "Wastage_GB", "Wastage_GBh", + "Failures", "Runtime_Tasks", "Number_Tests", "Workflow", "RuntimeExp", "MAQ", "Alpha", + "Use_softmax", "Models", "Error_Metric", "Accuracy", "Seed"]) - with open(get_file_path(workflow, alpha, use_softmax, error_metric, seed), 'a', newline='\n') as csvfile: + with open(get_file_path(workflow, alpha, use_softmax, error_metric, seed, use_online_grid), 'a', newline='\n') as csvfile: writer = csv.writer(csvfile, delimiter=',') writer.writerow( - [method_name, error_strategy, offset_strategy, taskname, wastage_in_bytes, - wastage_mb, wastage_gb, wastage_mbh, wastage_gbh, failures, runtimes_task, + [method_name, error_strategy, offset_strategy, taskname, wastage_gb, wastage_gbh, failures, runtimes_task, number_test, workflow, runtime_exp, maq, alpha, use_softmax, models, error_metric, accuracy, seed]) def check_substring_in_csv(workflow, alpha, use_softmax, error_metric, method_name, taskname, offset_strategy, - error_strategy, seed): - if not os.path.exists(get_file_path(workflow, alpha, use_softmax, error_metric, seed)): + error_strategy, seed, use_online_grid): + """ + Checks if a specific substring exists in the CSV file corresponding to the given parameters. + + :param workflow: Name of the workflow. + :param alpha: Alpha value used in the experiment. + :param use_softmax: Boolean indicating whether softmax was used. + :param error_metric: Error metric used for evaluation. + :param method_name: Name of the method used. + :param taskname: Name of the task. + :param offset_strategy: Strategy used for offset handling. + :param error_strategy: Strategy used for error handling. + :param seed: Random seed used for reproducibility. + + :return: True if the substring exists in the CSV file, False otherwise. + """ + if not os.path.exists(get_file_path(workflow, alpha, use_softmax, error_metric, seed, use_online_grid)): return False - with open(get_file_path(workflow, alpha, use_softmax, error_metric, seed), newline='') as csvfile: + with open(get_file_path(workflow, alpha, use_softmax, error_metric, seed, use_online_grid), newline='') as csvfile: reader = csv.reader(csvfile) for row in reader: # Ensure the row has enough columns substring = method_name + "," + error_strategy + "," + offset_strategy + "," + taskname + "," - print(substring) - print(row) if (method_name in row[0]) & (error_strategy in row[1]) & (offset_strategy in row[2]) & ( taskname in row[3]): return True @@ -47,6 +97,26 @@ def write_single_task_to_csv(method_name: str, error_strategy: str, offset_strat error_metric: str, use_softmax: bool, taskname: str, wastage_gbh: int, prediction_list: list, actual_memory: str, raw_predictions: float, failures: int, alpha: float, task_runtime: int, experimental_time: float, seed: int): + """ + Writes the results of a single task execution to a CSV file. + + :param method_name: Name of the method used. + :param error_strategy: Strategy used for error handling. + :param offset_strategy: Strategy used for offset handling. + :param workflow: Name of the workflow. + :param error_metric: Error metric used for evaluation. + :param use_softmax: Boolean indicating whether softmax was used. + :param taskname: Name of the task. + :param wastage_gbh: Wastage in gigabytes per hour. + :param prediction_list: List of predictions made by the method. + :param actual_memory: Actual memory used during the task. + :param raw_predictions: Raw predictions made by the method. + :param failures: Number of failures encountered during the task. + :param alpha: Alpha value used in the experiment. + :param task_runtime: Runtime of the task in milliseconds. + :param experimental_time: Experimental time taken for the task. + :param seed: Random seed used for reproducibility. + """ if not (os.path.exists(get_file_path_tasks(workflow, alpha, use_softmax, error_metric, seed))): with open(get_file_path_tasks(workflow, alpha, use_softmax, error_metric, seed), 'a', newline='\n') as csvfile: writer = csv.writer(csvfile, delimiter=',') @@ -62,31 +132,92 @@ def write_single_task_to_csv(method_name: str, error_strategy: str, offset_strat wastage_gbh, task_runtime, prediction_list, actual_memory, raw_predictions, failures, experimental_time, seed]) -def get_file_path(workflow: str, alpha: float, use_softmax: bool, error_metric: str, seed: int): +def get_file_path(workflow: str, alpha: float, use_softmax: bool, error_metric: str, seed: int, use_online_grid: bool): + """ + Constructs the file path for storing results based on the workflow, alpha value, softmax usage, + error metric, and seed. + + :param workflow: Name of the workflow. + :param alpha: Alpha value used in the experiment. + :param use_softmax: Boolean indicating whether softmax was used. + :param error_metric: Error metric used for evaluation. + :param seed: Random seed used for reproducibility. + + :return: A string representing the file path for storing results. + """ return './results/results_sizey_' + workflow + '_' + str(alpha) + '_' + str( - use_softmax) + '_' + error_metric + '_' + str(seed) + '.csv' + use_softmax) + '_' + error_metric + '_' + str(seed) + '_' + str(use_online_grid) + '.csv' def get_file_path_tasks(workflow: str, alpha: float, use_softmax: bool, error_metric: str, seed: int): + """ + Constructs the file path for storing task-specific results based on the workflow, alpha value, + softmax usage, error metric, and seed. + + :param workflow: Name of the workflow. + :param alpha: Alpha value used in the experiment. + :param use_softmax: Boolean indicating whether softmax was used. + :param error_metric: Error metric used for evaluation. + :param seed: Random seed used for reproducibility. + + :return: A string representing the file path for storing task-specific results. + """ return './results/results_sizey_' + workflow + '_' + str(alpha) + '_' + str( use_softmax) + '_' + error_metric + '_' + str(seed) + '_tasks.csv' def byte_to_mb(byte_value: int): + """ + Converts a value in bytes to megabytes. + + :param byte_value: Value in bytes to be converted. + + :return: Value in megabytes. + """ return byte_value * 0.000001 def byte_to_gigabyte(byte_value: int): + """ + Converts a value in bytes to gigabytes. + + :param byte_value: Value in bytes to be converted. + + :return: Value in gigabytes. + """ return byte_value * 0.000000001 def ms_to_h(runtime_in_ms: int): + """ + Converts a runtime in milliseconds to hours. + + :param runtime_in_ms: Runtime in milliseconds to be converted. + + :return: Runtime in hours. + """ return runtime_in_ms / 3600000.0 def byte_and_time_to_mbh(byte_value: int, runtime_in_ms: int): + """ + Converts a value in bytes and runtime in milliseconds to megabytes per hour. + + :param byte_value: Value in bytes to be converted. + :param runtime_in_ms: Runtime in milliseconds to be converted. + + :return: Value in megabytes per hour. + """ return byte_to_mb(byte_value) * ms_to_h(runtime_in_ms) def byte_and_time_to_gbh(byte_value: int, runtime_in_ms: int): + """ + Converts a value in bytes and runtime in milliseconds to gigabytes per hour. + + :param byte_value: Value in bytes to be converted. + :param runtime_in_ms: Runtime in milliseconds to be converted. + + :return: Value in gigabytes per hour. + """ return byte_to_gigabyte(byte_value) * ms_to_h(runtime_in_ms) diff --git a/approach/kmeans_predictor.py b/approach/kmeans_predictor.py index c60eac1..b54fcca 100644 --- a/approach/kmeans_predictor.py +++ b/approach/kmeans_predictor.py @@ -7,35 +7,74 @@ class ClusteringPredictor(PredictionModel): + """ + ClusteringPredictor is a class that implements a machine learning model for regression tasks using clustering techniques. + + This class extends the `PredictionModel` base class and provides methods for training, predicting, and updating + the clustering model. It uses MiniBatchKMeans for clustering and includes functionality for scaling features and labels. + + Attributes: + X_train_full (np.ndarray): Historical training features. + y_train_full (np.ndarray): Historical training labels. + train_X_scaler (MinMaxScaler): Scaler for normalizing training features. + train_y_scaler (MinMaxScaler): Scaler for normalizing training labels. + regressor (MiniBatchKMeans): Trained clustering model. + """ def initial_model_training(self, X_train, y_train) -> None: + """ + Initializes the model with training data using MiniBatchKMeans clustering. + :param X_train: Training features. + :param y_train: Training labels. + """ self.train_X_scaler = MinMaxScaler() self.train_y_scaler = MinMaxScaler() # Scale Features - X_train_scaled = self.train_X_scaler.fit_transform(X_train) - y_train_scaled = self.train_y_scaler.fit_transform(y_train.values.reshape(-1, 1)) + X_train_scaled = self.train_X_scaler.fit_transform(self._ensure_column_vector(X_train)) + y_train_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)) # Initialize internal storage of historical values - self.X_train_full = X_train - self.y_train_full = y_train.values.reshape(-1, 1) + self.X_train_full = self._ensure_column_vector(X_train) + self.y_train_full = self._ensure_column_vector(y_train) self.regressor = MiniBatchKMeans().fit(X_train_scaled, y_train_scaled) def predict_task(self, task_features: pd.Series) -> float: - task_features_scaled = self.train_X_scaler.transform(task_features.values.reshape(-1, 1)) - return self.train_y_scaler.inverse_transform(self.regressor.predict(task_features_scaled).reshape(-1, 1)) + """ + Predicts the output for a single task based on its features using the trained clustering model. + + :param task_features: Features of the task to predict. + + :return: Predicted value for the task. + """ + task_features_scaled = self.train_X_scaler.transform(self._ensure_column_vector(task_features)) + preds = self.regressor.predict(task_features_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def predict_tasks(self, taskDataframe: pd.DataFrame) -> np.ndarray: - taskDataframe_scaled = self.train_X_scaler.transform(taskDataframe) + """ + Predicts the output for multiple tasks based on their features using the trained clustering model. + + :param taskDataframe: DataFrame containing features of multiple tasks. - return self.train_y_scaler.inverse_transform(self.regressor.predict(taskDataframe_scaled).reshape(-1, 1)) + :return: Array of predicted values for the tasks. + """ + taskDataframe_scaled = self.train_X_scaler.transform(taskDataframe) + preds = self.regressor.predict(taskDataframe_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def update_model(self, X_train: pd.Series, y_train: float) -> None: + """ + Updates the model with new training data by appending it to the historical data and retraining the model. + + :param X_train: New training features. + :param y_train: New training labels. + """ # Append the newly incoming data to maintain all historical data - self.X_train_full = np.concatenate((self.X_train_full, [X_train])) - self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) + self.X_train_full = np.concatenate((self.X_train_full, self._ensure_column_vector(X_train))) + self.y_train_full = np.concatenate((self.y_train_full, self._ensure_column_vector([y_train]))) # Scaling of data with all historical data self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) @@ -43,4 +82,4 @@ def update_model(self, X_train: pd.Series, y_train: float) -> None: # Retrain existing model with scaled data self.regressor.fit(self.train_X_scaler.transform(self.X_train_full), - self.train_y_scaler.transform(self.y_train_full)) + self.train_y_scaler.transform(self.y_train_full)) \ No newline at end of file diff --git a/approach/knn_regression_predictor.py b/approach/knn_regression_predictor.py index 9cedb51..992b6cb 100644 --- a/approach/knn_regression_predictor.py +++ b/approach/knn_regression_predictor.py @@ -1,50 +1,136 @@ +import itertools import numpy as np import pandas as pd -import logging from sklearn.metrics import make_scorer -from sklearn.model_selection import cross_val_score, LeaveOneOut, GridSearchCV +from sklearn.model_selection import GridSearchCV from sklearn.neighbors import KNeighborsRegressor -from sklearn.neural_network import MLPRegressor + from sklearn.preprocessing import MinMaxScaler from warnings import simplefilter from sklearn.exceptions import ConvergenceWarning +from approach import helper from approach.abstract_predictor import PredictionModel simplefilter("ignore", category=ConvergenceWarning) class KNNPredictor(PredictionModel): + """K-Nearest Neighbour regressor with optional online grid search. + + By default the model mirrors the original implementation and performs a full + re-training on *all* historical data whenever a new sample is added. When + ``use_online_grid`` is enabled an online grid-search style procedure keeps a + pool of ``KNeighborsRegressor`` instances and updates them on every new + sample. + + Attributes: + X_train_full (np.ndarray): Historical training features. + y_train_full (np.ndarray): Historical training labels. + train_X_scaler (MinMaxScaler): Scaler for normalizing training features. + train_y_scaler (MinMaxScaler): Scaler for normalizing training labels. + regressor (KNeighborsRegressor): Trained KNN model. + model_error (float): Error score of the best model. + best_params (dict): Hyperparameters of the selected model. + err_metr (str): Error metric used for model selection. + """ + + params = { + 'n_neighbors': [2, 3, 5, 7, 9], + 'weights': ['uniform', 'distance'], + 'algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute'] + } + + def __init__(self, workflow_name: str, task_name: str, err_metr: str, + use_online_grid: bool = False, + num_full_online_partials: int = 10, + save_top_n: int = 10, + param_cull_cutoff: int = -5, + param_cull_plus_percent: float = 0.3, + param_cull_minus_percent: float = 0.3): + super().__init__(workflow_name, task_name, err_metr) + self.use_online_grid = use_online_grid + if self.use_online_grid: + self.param_order = tuple(self.params.keys()) + self.param_grid = {k: None for k in itertools.product(*self.params.values())} + self.param_points = {k: 0 for k in itertools.product(*self.params.values())} + self.num_full_online_partials = num_full_online_partials + self.save_top_n = save_top_n + self.param_cull_cutoff = param_cull_cutoff + self.param_cull_plus_percent = param_cull_plus_percent + self.param_cull_minus_percent = param_cull_minus_percent def initial_model_training(self, X_train, y_train) -> None: + """ + Initializes the KNN model with training data. + + :param X_train: Training features. + :param y_train: Training labels. + """ # Initialize internal storage of historical values - self.X_train_full = X_train - self.y_train_full = y_train + self.X_train_full = self._ensure_column_vector(X_train) + self.y_train_full = self._ensure_column_vector(y_train) - self._selectBestModel(X_train, y_train) + if self.use_online_grid: + self._initial_online_grid(self.X_train_full, self.y_train_full) + else: + self._selectBestModel(self.X_train_full, self.y_train_full) def predict_task(self, task_features: pd.Series) -> float: - task_features_scaled = self.train_X_scaler.transform(task_features.values.reshape(-1, 1)) - return self.train_y_scaler.inverse_transform(self.regressor.predict(task_features_scaled).reshape(-1, 1)) + """ + Predicts the output for a single task based on its features using the trained KNN model. + + :param task_features: Features of the task to predict. + + :return: Predicted value for the task. + """ + task_features_scaled = self.train_X_scaler.transform(self._ensure_column_vector(task_features)) + preds = self.regressor.predict(task_features_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def predict_tasks(self, taskDataframe: pd.DataFrame) -> np.ndarray: + """ + Predicts the output for multiple tasks based on their features using the trained KNN model. + + :param taskDataframe: DataFrame containing features of multiple tasks. + + :return: Array of predicted values for the tasks. + """ taskDataframe_scaled = self.train_X_scaler.transform(taskDataframe) - return self.train_y_scaler.inverse_transform(self.regressor.predict(taskDataframe_scaled).reshape(-1, 1)) + preds = self.regressor.predict(taskDataframe_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def update_model(self, X_train: pd.Series, y_train: float) -> None: + """Update the model with a new sample. + + :param X_train: New training features. + :param y_train: New training labels. + """ + if self.use_online_grid: + self._update_online_grid(np.asarray(X_train), y_train) + return + # Append the newly incoming data to maintain all historical data - self.X_train_full = np.concatenate((self.X_train_full, [X_train])) - self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) + self.X_train_full = np.concatenate((self.X_train_full, self._ensure_column_vector(X_train))) + self.y_train_full = np.concatenate((self.y_train_full, self._ensure_column_vector([y_train]))) - # Scaling of data with all historical data self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) self.train_y_scaler = self.train_y_scaler.fit(self.y_train_full) self._selectBestModel(self.X_train_full, self.y_train_full) def smoothed_mape(self, y_true, y_pred, epsilon=1e-8): + """ + Calculates the smoothed Mean Absolute Percentage Error (MAPE) between true and predicted values. + + :param y_true: True values. + :param y_pred: Predicted values. + :param epsilon: Small value to avoid division by zero. + + :return: Smoothed MAPE value. + """ y_true, y_pred = np.array(y_true), np.array(y_pred) # Calculate the individual percentage errors and clip each at 100% mape = np.abs((y_true - y_pred) / (y_true + epsilon)) @@ -52,20 +138,29 @@ def smoothed_mape(self, y_true, y_pred, epsilon=1e-8): return np.mean(mape) def _selectBestModel(self, X_train, y_train): + """ + Fit a ``KNeighborsRegressor`` using grid search to select the best + hyperparameters. + The best parameters are stored so that subsequent updates can reuse the + same configuration without running grid search again. + + :param X_train: Training features. + :param y_train: Training labels. + """ self.train_X_scaler = MinMaxScaler() self.train_y_scaler = MinMaxScaler() # Scale Features X_train_scaled = self.train_X_scaler.fit_transform(X_train) - y_train_scaled = self.train_y_scaler.fit_transform(y_train.reshape(-1, 1)) # Ensure y is correctly shaped + y_train_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)) smoothed_mape_scorer = make_scorer(self.smoothed_mape, greater_is_better=True) param_grid = { - 'n_neighbors': [2, 3, 5, 7, 9], - 'weights': ['uniform', 'distance'], - 'algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute'] + "n_neighbors": [2, 3, 5, 7, 9], + "weights": ["uniform", "distance"], + "algorithm": ["auto", "ball_tree", "kd_tree", "brute"], } model = KNeighborsRegressor() @@ -86,3 +181,71 @@ def _selectBestModel(self, X_train, y_train): self.model_error = best_score self.regressor = best_model + + # ------------------------------------------------------------------ + # Online grid-search style training used when ``use_online_grid`` is True + # ------------------------------------------------------------------ + + def _initial_online_grid(self, X_train, y_train) -> None: + self.train_X_scaler = MinMaxScaler() + self.train_y_scaler = MinMaxScaler() + + X_scaled = self.train_X_scaler.fit_transform(X_train) + y_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)) + + scores = [] + for params in self.param_grid.keys(): + if self.param_points[params] <= self.param_cull_cutoff: + continue + model = KNeighborsRegressor(**{k: v for k, v in zip(self.param_order, params)}) + model.fit(X_scaled, y_scaled.ravel()) + self.param_grid[params] = model + scores.append((params, model, model.score(X_scaled, y_scaled.ravel()))) + + scores.sort(key=lambda x: x[2]) + + for params, _, _ in scores[:min(int(round(len(scores) * self.param_cull_minus_percent)) + 1, + max(0, len(scores) - self.save_top_n))]: + self.param_points[params] -= 1 + + for params, _, _ in scores[min(int(round(len(scores) * (1 - self.param_cull_plus_percent))) + 1, + max(0, len(scores) - self.save_top_n)):]: + self.param_points[params] += 1 + + _, best_model, best_score = scores[-1] + self.model_error = best_score + self.regressor = best_model + helper.log_if_verbose(f"Best Score for KNN: {best_score}") + + def _update_online_grid(self, X_train: np.ndarray, y_train: float) -> None: + self.X_train_full = np.concatenate((self.X_train_full, [X_train])) + self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) + + self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) + self.train_y_scaler = self.train_y_scaler.fit(self.y_train_full) + + transformed_X = self.train_X_scaler.transform(self.X_train_full) + transformed_y = self.train_y_scaler.transform(self.y_train_full).ravel() + + scores = [] + for params, model in self.param_grid.items(): + if self.param_points[params] <= self.param_cull_cutoff: + continue + model.fit(transformed_X, transformed_y) + self.param_grid[params] = model + scores.append((params, model, model.score(transformed_X, transformed_y))) + + scores.sort(key=lambda x: x[2]) + + for params, _, _ in scores[:min(int(round(len(scores) * self.param_cull_minus_percent)) + 1, + max(0, len(scores) - self.save_top_n))]: + self.param_points[params] -= 1 + + for params, _, _ in scores[min(int(round(len(scores) * (1 - self.param_cull_plus_percent))) + 1, + max(0, len(scores) - self.save_top_n)):]: + self.param_points[params] += 1 + + _, best_model, best_score = scores[-1] + self.model_error = best_score + self.regressor = best_model + helper.log_if_verbose(f"Best Score for KNN: {best_score}") \ No newline at end of file diff --git a/approach/linear_regression_predictor.py b/approach/linear_regression_predictor.py index aa6b8ba..494bbda 100644 --- a/approach/linear_regression_predictor.py +++ b/approach/linear_regression_predictor.py @@ -1,52 +1,128 @@ +import itertools import numpy as np import pandas as pd -import logging -from sklearn.linear_model import SGDRegressor, Lasso +from sklearn.linear_model import LinearRegression, SGDRegressor from sklearn.metrics import make_scorer from sklearn.model_selection import GridSearchCV -from sklearn.neural_network import MLPRegressor from sklearn.preprocessing import MinMaxScaler -from sklearn.linear_model import Ridge -import matplotlib.pyplot as plt -from sklearn.linear_model import LinearRegression -from sklearn.linear_model import RidgeCV -from sklearn import linear_model +from approach import helper from approach.abstract_predictor import PredictionModel class LinearPredictor(PredictionModel): + """Linear regression predictor with optional online grid search. + + The default behaviour mirrors the original implementation where the model is + fully retrained on *all* available data whenever new samples arrive. When + ``use_online_grid`` is enabled an online grid-search style training using + ``SGDRegressor`` is performed instead. + + Attributes: + X_train_full (np.ndarray): Historical training features. + y_train_full (np.ndarray): Historical training labels. + train_X_scaler (MinMaxScaler): Scaler for normalizing training features. + train_y_scaler (MinMaxScaler): Scaler for normalizing training labels. + regressor (SGDRegressor): Trained SGD regression model. + model_error (float): Error score of the best model. + err_metr (str): Error metric used for model selection. + """ + + params = { + 'alpha': [0.0001, 0.001], + 'penalty': ['l2', 'l1', 'elasticnet'], + 'learning_rate': ['optimal', 'invscaling'] + } + + def __init__(self, workflow_name: str, task_name: str, err_metr: str, + use_online_grid: bool = False, + num_full_online_partials: int = 10, + save_top_n: int = 10, + param_cull_cutoff: int = -5, + param_cull_plus_percent: float = 0.3, + param_cull_minus_percent: float = 0.3): + + """Initialize the predictor.""" + super().__init__(workflow_name, task_name, err_metr) + self.use_online_grid = use_online_grid + if self.use_online_grid: + self.param_order = tuple(self.params.keys()) + self.param_grid = {k: None for k in itertools.product(*self.params.values())} + self.param_points = {k: 0 for k in itertools.product(*self.params.values())} + self.num_full_online_partials = num_full_online_partials + self.save_top_n = save_top_n + self.param_cull_cutoff = param_cull_cutoff + self.param_cull_plus_percent = param_cull_plus_percent + self.param_cull_minus_percent = param_cull_minus_percent def initial_model_training(self, X_train, y_train) -> None: + """Initialize the regression model with training data. - self.X_train_full = X_train - self.y_train_full = y_train + :param X_train: Training features. + :param y_train: Training labels. + """ + self.X_train_full = self._ensure_column_vector(X_train) + self.y_train_full = self._ensure_column_vector(y_train) - self._select_best_model(X_train, y_train) + if self.use_online_grid: + self._initial_online_grid(self.X_train_full, self.y_train_full) + else: + self._select_best_model(self._ensure_column_vector(X_train), self._ensure_column_vector(y_train)) def predict_task(self, task_features: pd.Series) -> float: - task_features_scaled = self.train_X_scaler.transform(task_features.values.reshape(-1, 1)) - return self.train_y_scaler.inverse_transform(self.regressor.predict(task_features_scaled).reshape(-1, 1)) + """Predict the output for a single task. + + :param task_features: Features of the task to predict. + + :return: Predicted value for the task. + """ + task_features_scaled = self.train_X_scaler.transform(self._ensure_column_vector(task_features)) + preds = self.regressor.predict(task_features_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def predict_tasks(self, taskDataframe: pd.DataFrame) -> np.ndarray: + """Predict the output for multiple tasks. + + :param taskDataframe: DataFrame containing features of multiple tasks. + + :return: Array of predicted values for the tasks. + """ taskDataframe_scaled = self.train_X_scaler.transform(taskDataframe) - return self.train_y_scaler.inverse_transform(self.regressor.predict(taskDataframe_scaled).reshape(-1, 1)) + preds = self.regressor.predict(taskDataframe_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def update_model(self, X_train: pd.Series, y_train: float) -> None: - # Append the newly incoming data to maintain all historical data - self.X_train_full = np.concatenate((self.X_train_full, [X_train])) - self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) - - # Scaling of data with all historical data + """Update the model with a new sample.""" + if self.use_online_grid: + self._update_online_grid(np.asarray(X_train), y_train) + return + # Append new data to history + self.X_train_full = np.concatenate( + (self.X_train_full, self._ensure_column_vector(X_train)) + ) + self.y_train_full = np.concatenate( + (self.y_train_full, self._ensure_column_vector([y_train])) + ) + + # Refit scalers on complete history self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) self.train_y_scaler = self.train_y_scaler.fit(self.y_train_full) - # Retrain existing model with scaled data + # Full retraining on all data self._select_best_model(self.X_train_full, self.y_train_full) def smoothed_mape(self, y_true, y_pred, epsilon=1e-8): + """ + Calculates the smoothed Mean Absolute Percentage Error (MAPE) between true and predicted values. + + :param y_true: True values. + :param y_pred: Predicted values. + :param epsilon: Small value to avoid division by zero. + + :return: Smoothed MAPE value. + """ y_true, y_pred = np.array(y_true), np.array(y_pred) # Calculate the individual percentage errors and clip each at 100% mape = np.abs((y_true - y_pred) / (y_true + epsilon)) @@ -54,18 +130,21 @@ def smoothed_mape(self, y_true, y_pred, epsilon=1e-8): return np.mean(mape) def _select_best_model(self, X_train, y_train): + """Fit a ``LinearRegression`` model via cross-validation. + :param X_train: Training features. + :param y_train: Training labels. + """ self.train_X_scaler = MinMaxScaler() self.train_y_scaler = MinMaxScaler() # Scale Features X_train_scaled = self.train_X_scaler.fit_transform(X_train) - y_train_scaled = self.train_y_scaler.fit_transform(y_train) + y_train_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)) smoothed_mape_scorer = make_scorer(self.smoothed_mape, greater_is_better=True) - param_grid = { - } + param_grid = {} model = LinearRegression() @@ -84,3 +163,82 @@ def _select_best_model(self, X_train, y_train): self.model_error = best_score self.regressor = best_model + + # ------------------------------------------------------------------ + # Online grid-search style training used when ``use_online_grid`` is True + # ------------------------------------------------------------------ + + def _initial_online_grid(self, X_train, y_train) -> None: + self.train_X_scaler = MinMaxScaler() + self.train_y_scaler = MinMaxScaler() + + X_scaled = self.train_X_scaler.fit_transform(X_train) + y_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)) + + scores = [] + for params in self.param_grid.keys(): + if self.param_points[params] <= self.param_cull_cutoff: + continue + model = SGDRegressor(random_state=42, max_iter=1000, + **{k: v for k, v in zip(self.param_order, params)}) + model.fit(X_scaled, y_scaled.ravel()) + self.param_grid[params] = model + scores.append((params, model, model.score(X_scaled, y_scaled.ravel()))) + + scores.sort(key=lambda x: x[2]) + + for params, _, _ in scores[:min(int(round(len(scores) * self.param_cull_minus_percent)) + 1, + max(0, len(scores) - self.save_top_n))]: + self.param_points[params] -= 1 + + for params, _, _ in scores[min(int(round(len(scores) * (1 - self.param_cull_plus_percent))) + 1, + max(0, len(scores) - self.save_top_n)):]: + self.param_points[params] += 1 + + _, best_model, best_score = scores[-1] + self.model_error = best_score + self.regressor = best_model + helper.log_if_verbose(f"Best Score for LinearRegressor: {best_score}") + + def _update_online_grid(self, X_train: np.ndarray, y_train: float) -> None: + self.X_train_full = np.concatenate((self.X_train_full, [X_train])) + self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) + + self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) + self.train_y_scaler = self.train_y_scaler.fit(self.y_train_full) + + transformed_X = self.train_X_scaler.transform(self.X_train_full) + transformed_y = self.train_y_scaler.transform(self.y_train_full).ravel() + + scores = [] + for params, model in self.param_grid.items(): + if self.param_points[params] <= self.param_cull_cutoff: + continue + model.partial_fit(self.train_X_scaler.transform(np.array(X_train).reshape(1, -1)), + self.train_y_scaler.transform(np.array([y_train]).reshape(1, -1))) + early_quitting = [model.score(transformed_X, transformed_y)] + for _ in range(self.num_full_online_partials): + model.partial_fit(self.train_X_scaler.transform(self.X_train_full), + self.train_y_scaler.transform(self.y_train_full).ravel()) + early_quitting.append(model.score(transformed_X, transformed_y)) + if len(early_quitting) >= 12: + if sum(helper.nth_deltas(early_quitting, 2, 10)) >= -1e-4: + break + self.param_grid[params] = model + scores.append((params, model, model.score(transformed_X, transformed_y))) + + scores.sort(key=lambda x: x[2]) + + for params, _, _ in scores[:min(int(round(len(scores) * self.param_cull_minus_percent)) + 1, + max(0, len(scores) - self.save_top_n))]: + self.param_points[params] -= 1 + + for params, _, _ in scores[min(int(round(len(scores) * (1 - self.param_cull_plus_percent))) + 1, + max(0, len(scores) - self.save_top_n)):]: + self.param_points[params] += 1 + + _, best_model, best_score = scores[-1] + self.model_error = best_score + self.regressor = best_model + helper.log_if_verbose(f"Best Score for LinearRegressor: {best_score}") + diff --git a/approach/neural_network_predictor.py b/approach/neural_network_predictor.py index 443414f..8ac0388 100644 --- a/approach/neural_network_predictor.py +++ b/approach/neural_network_predictor.py @@ -1,50 +1,143 @@ +import itertools import numpy as np import pandas as pd -import logging from sklearn.metrics import make_scorer -from sklearn.model_selection import cross_val_score, LeaveOneOut, GridSearchCV +from sklearn.model_selection import GridSearchCV from sklearn.neural_network import MLPRegressor from sklearn.preprocessing import MinMaxScaler from warnings import simplefilter from sklearn.exceptions import ConvergenceWarning - +from approach import helper from approach.abstract_predictor import PredictionModel simplefilter("ignore", category=ConvergenceWarning) class NeuralNetworkPredictor(PredictionModel): + """ + NeuralNetworkPredictor is a class that implements a machine learning model for regression tasks using a Neural Network. + + This class extends the `PredictionModel` base class and provides methods for training, predicting, and updating + the Neural Network model. It also includes functionality for hyperparameter tuning and model selection based on + specified error metrics. + + Attributes: + X_train_full (np.ndarray): Historical training features. + y_train_full (np.ndarray): Historical training labels. + train_X_scaler (MinMaxScaler): Scaler for normalizing training features. + train_y_scaler (MinMaxScaler): Scaler for normalizing training labels. + regressor (MLPRegressor): Trained Neural Network model. + model_error (float): Error score of the best model. + best_params (dict): Hyperparameters of the selected model. + err_metr (str): Error metric used for model selection. + """ + + params = { + 'hidden_layer_sizes': [(5, 5), (7, 7), (9, 9), (10, 10), (20, 20)], + 'alpha': [0.0001, 0.001], + 'solver': ['lbfgs', 'adam'], + 'learning_rate': ['constant', 'adaptive'] + } + + def __init__(self, workflow_name: str, task_name: str, err_metr: str, + use_online_grid: bool = False, + num_full_online_partials: int = 10, + save_top_n: int = 10, + param_cull_cutoff: int = -5, + param_cull_plus_percent: float = 0.3, + param_cull_minus_percent: float = 0.3): + """Initialize the predictor.""" + super().__init__(workflow_name, task_name, err_metr) + self.use_online_grid = use_online_grid + if self.use_online_grid: + self.param_order = tuple(self.params.keys()) + self.param_grid = {k: None for k in itertools.product(*self.params.values())} + self.param_points = {k: 0 for k in itertools.product(*self.params.values())} + self.num_full_online_partials = num_full_online_partials + self.save_top_n = save_top_n + self.param_cull_cutoff = param_cull_cutoff + self.param_cull_plus_percent = param_cull_plus_percent + self.param_cull_minus_percent = param_cull_minus_percent def initial_model_training(self, X_train, y_train) -> None: + """ + Initializes the Neural Network model with training data. + + :param X_train: Training features. + :param y_train: Training labels. + """ # Initialize internal storage of historical values - self.X_train_full = X_train - self.y_train_full = y_train + self.X_train_full = self._ensure_column_vector(X_train) + self.y_train_full = self._ensure_column_vector(y_train) - self._selectBestModel(X_train, y_train) + if self.use_online_grid: + self._initial_online_grid(self.X_train_full, self.y_train_full) + else: + self._selectBestModel(self.X_train_full, self.y_train_full) def predict_task(self, task_features: pd.Series) -> float: - task_features_scaled = self.train_X_scaler.transform(task_features.values.reshape(-1, 1)) - return self.train_y_scaler.inverse_transform(self.regressor.predict(task_features_scaled).reshape(-1, 1)) + """ + Predicts the output for a single task based on its features using the trained Neural Network model. + + :param task_features: Features of the task to predict. + + :return: Predicted value for the task. + """ + task_features_scaled = self.train_X_scaler.transform(self._ensure_column_vector(task_features)) + preds = self.regressor.predict(task_features_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def predict_tasks(self, taskDataframe: pd.DataFrame) -> np.ndarray: + """ + Predicts the output for multiple tasks based on their features using the trained Neural Network model. + + :param taskDataframe: DataFrame containing features of multiple tasks. + + :return: Array of predicted values for the tasks. + """ taskDataframe_scaled = self.train_X_scaler.transform(taskDataframe) - return self.train_y_scaler.inverse_transform(self.regressor.predict(taskDataframe_scaled).reshape(-1, 1)) + preds = self.regressor.predict(taskDataframe_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def update_model(self, X_train: pd.Series, y_train: float) -> None: - # Append the newly incoming data to maintain all historical data - self.X_train_full = np.concatenate((self.X_train_full, [X_train])) - self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) + """Update the model with a new sample. + + When ``use_online_grid`` is ``False`` the sample is appended to the + stored training data, scalers are refit on the full dataset, and the + model is retrained via ``_selectBestModel``. If ``use_online_grid`` is + ``True`` the update is delegated to ``_update_online_grid``. + """ + if self.use_online_grid: + self._update_online_grid(np.asarray(X_train), y_train) + return + + self.X_train_full = np.concatenate( + (self.X_train_full, self._ensure_column_vector(X_train)) + ) + self.y_train_full = np.concatenate( + (self.y_train_full, self._ensure_column_vector([y_train])) + ) # Scaling of data with all historical data self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) self.train_y_scaler = self.train_y_scaler.fit(self.y_train_full) + # Refit scalers and retrain model on all available data self._selectBestModel(self.X_train_full, self.y_train_full) def smoothed_mape(self, y_true, y_pred, epsilon=1e-8): + """ + Computes the smoothed Mean Absolute Percentage Error (MAPE) between true and predicted values. + + :param y_true: True values. + :param y_pred: Predicted values. + :param epsilon: Small value to avoid division by zero. + + :return: Smoothed MAPE value. + """ y_true, y_pred = np.array(y_true), np.array(y_pred) # Calculate the individual percentage errors and clip each at 100% mape = np.abs((y_true - y_pred) / (y_true + epsilon)) @@ -52,13 +145,19 @@ def smoothed_mape(self, y_true, y_pred, epsilon=1e-8): return np.mean(mape) def _selectBestModel(self, X_train, y_train): + """ + Fit an ``MLPRegressor`` using grid search to select the best hyperparameters. + :param X_train: Training features. + :param y_train: Training labels. + """ + # Reinitialize scalers on the provided data self.train_X_scaler = MinMaxScaler() self.train_y_scaler = MinMaxScaler() # Scale Features X_train_scaled = self.train_X_scaler.fit_transform(X_train) - y_train_scaled = self.train_y_scaler.fit_transform(y_train) + y_train_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)) smoothed_mape_scorer = make_scorer(self.smoothed_mape, greater_is_better=True) @@ -84,10 +183,85 @@ def _selectBestModel(self, X_train, y_train): best_model = grid_search.best_estimator_ self.model_error = best_score - - - + self.best_params = grid_search.best_params_ + self.regressor = best_model + + # ------------------------------------------------------------------ + # Online grid-search style training used when ``use_online_grid`` is True + # ------------------------------------------------------------------ + + def _initial_online_grid(self, X_train, y_train) -> None: + self.train_X_scaler = MinMaxScaler() + self.train_y_scaler = MinMaxScaler() + + X_scaled = self.train_X_scaler.fit_transform(X_train) + y_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)) + + scores = [] + for params in self.param_grid.keys(): + if self.param_points[params] <= self.param_cull_cutoff: + continue + model = MLPRegressor(random_state=42, max_iter=5000, + **{k: v for k, v in zip(self.param_order, params)}) + model.fit(X_scaled, y_scaled.ravel()) + self.param_grid[params] = model + scores.append((params, model, model.score(X_scaled, y_scaled.ravel()))) + + scores.sort(key=lambda x: x[2]) + + for params, _, _ in scores[:min(int(round(len(scores) * self.param_cull_minus_percent)) + 1, + max(0, len(scores) - self.save_top_n))]: + self.param_points[params] -= 1 + + for params, _, _ in scores[min(int(round(len(scores) * (1 - self.param_cull_plus_percent))) + 1, + max(0, len(scores) - self.save_top_n)):]: + self.param_points[params] += 1 + + _, best_model, best_score = scores[-1] + self.model_error = best_score + self.regressor = best_model + helper.log_if_verbose(f"Best Score for NN: {best_score}") + def _update_online_grid(self, X_train: np.ndarray, y_train: float) -> None: + self.X_train_full = np.concatenate((self.X_train_full, [X_train])) + self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) + + self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) + self.train_y_scaler = self.train_y_scaler.fit(self.y_train_full) + + transformed_X = self.train_X_scaler.transform(self.X_train_full) + transformed_y = self.train_y_scaler.transform(self.y_train_full).ravel() + + scores = [] + for params, model in self.param_grid.items(): + if self.param_points[params] <= self.param_cull_cutoff: + continue + model.partial_fit(self.train_X_scaler.transform(np.array(X_train).reshape(1, -1)), + self.train_y_scaler.transform(np.array([y_train]).reshape(1, -1))) + early_quitting = [model.score(transformed_X, transformed_y)] + for _ in range(self.num_full_online_partials): + model.partial_fit(self.train_X_scaler.transform(self.X_train_full), + self.train_y_scaler.transform(self.y_train_full)) + early_quitting.append(model.score(transformed_X, transformed_y)) + if len(early_quitting) >= 12: + if sum(helper.nth_deltas(early_quitting, 2, 10)) >= -1e-4: + break + self.param_grid[params] = model + scores.append((params, model, model.score(transformed_X, transformed_y))) + + scores.sort(key=lambda x: x[2]) + + for params, _, _ in scores[:min(int(round(len(scores) * self.param_cull_minus_percent)) + 1, + max(0, len(scores) - self.save_top_n))]: + self.param_points[params] -= 1 + + for params, _, _ in scores[min(int(round(len(scores) * (1 - self.param_cull_plus_percent))) + 1, + max(0, len(scores) - self.save_top_n)):]: + self.param_points[params] += 1 + + _, best_model, best_score = scores[-1] + self.model_error = best_score self.regressor = best_model + helper.log_if_verbose(f"Best Score for NN: {best_score}") diff --git a/approach/random_forest_predictor.py b/approach/random_forest_predictor.py index ea30cbd..0175904 100644 --- a/approach/random_forest_predictor.py +++ b/approach/random_forest_predictor.py @@ -1,50 +1,141 @@ import numpy as np import pandas as pd -import logging from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import make_scorer -from sklearn.model_selection import cross_val_score, LeaveOneOut, GridSearchCV -from sklearn.neural_network import MLPRegressor +from sklearn.model_selection import GridSearchCV from sklearn.preprocessing import MinMaxScaler from warnings import simplefilter from sklearn.exceptions import ConvergenceWarning +import itertools from approach.abstract_predictor import PredictionModel +from approach import helper simplefilter("ignore", category=ConvergenceWarning) - class RandomForestPredictor(PredictionModel): + """ + RandomForestPredictor is a class that implements a machine learning model for regression tasks using Random Forest. + + This class extends the `PredictionModel` base class and provides methods for training, predicting, and updating + the Random Forest model. It also includes functionality for selecting the best model based on specified error metrics. + + Attributes: + X_train_full (np.ndarray): Historical training features. + y_train_full (np.ndarray): Historical training labels. + train_X_scaler (MinMaxScaler): Scaler for normalizing training features. + train_y_scaler (MinMaxScaler): Scaler for normalizing training labels. + regressor (RandomForestRegressor): Trained Random Forest model. + model_error (float): Error score of the best model. + best_params (dict): Hyperparameters of the selected model. + err_metr (str): Error metric used for model selection. + """ + + params = { + 'n_estimators': [10, 25, 50], + 'max_depth': [3, 5, 7], + 'min_samples_split': [2, 4], + 'min_samples_leaf': [1, 2], + 'max_features': [1] + } + + def __init__(self, workflow_name: str, task_name: str, err_metr: str, + use_online_grid: bool = False, + num_full_online_partials: int = 10, + save_top_n: int = 10, + param_cull_cutoff: int = -5, + param_cull_plus_percent: float = 0.3, + param_cull_minus_percent: float = 0.3): + """Initialize the predictor.""" + super().__init__(workflow_name, task_name, err_metr) + self.use_online_grid = use_online_grid + if self.use_online_grid: + self.param_order = tuple(self.params.keys()) + self.param_grid = {k: None for k in itertools.product(*self.params.values())} + self.param_points = {k: 0 for k in itertools.product(*self.params.values())} + self.num_full_online_partials = num_full_online_partials + self.save_top_n = save_top_n + self.param_cull_cutoff = param_cull_cutoff + self.param_cull_plus_percent = param_cull_plus_percent + self.param_cull_minus_percent = param_cull_minus_percent def initial_model_training(self, X_train, y_train) -> None: + """ + Initializes the Random Forest model with training data. + + :param X_train: Training features. + :param y_train: Training labels. + """ # Initialize internal storage of historical values - self.X_train_full = X_train - self.y_train_full = y_train + self.X_train_full = self._ensure_column_vector(X_train) + self.y_train_full = self._ensure_column_vector(y_train) - self._selectBestModel(X_train, y_train) + if self.use_online_grid: + self._initial_online_grid(self.X_train_full, self.y_train_full) + else: + self._selectBestModel(self.X_train_full, self.y_train_full) def predict_task(self, task_features: pd.Series) -> float: - task_features_scaled = self.train_X_scaler.transform(task_features.values.reshape(-1, 1)) - return self.train_y_scaler.inverse_transform(self.regressor.predict(task_features_scaled).reshape(-1, 1)) + """ + Predicts the output for a single task based on its features using the trained Random Forest model. + + :param task_features: Features of the task to predict. + + :return: Predicted value for the task. + """ + task_features_scaled = self.train_X_scaler.transform(self._ensure_column_vector(task_features)) + preds = self.regressor.predict(task_features_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def predict_tasks(self, taskDataframe: pd.DataFrame) -> np.ndarray: + """ + Predicts the output for multiple tasks based on their features using the trained Random Forest model. + + :param taskDataframe: DataFrame containing features of multiple tasks. + + :return: Array of predicted values for the tasks. + """ taskDataframe_scaled = self.train_X_scaler.transform(taskDataframe) - return self.train_y_scaler.inverse_transform(self.regressor.predict(taskDataframe_scaled).reshape(-1, 1)) + preds = self.regressor.predict(taskDataframe_scaled) + return self.train_y_scaler.inverse_transform(self._ensure_column_vector(preds)) def update_model(self, X_train: pd.Series, y_train: float) -> None: + """Update the model with a new training sample. + + :param X_train: New training features. + :param y_train: New training labels. + """ + if self.use_online_grid: + self._update_online_grid(np.asarray(X_train), y_train) + return + # Append the newly incoming data to maintain all historical data - self.X_train_full = np.concatenate((self.X_train_full, [X_train])) - self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) + self.X_train_full = np.concatenate( + (self.X_train_full, self._ensure_column_vector(X_train)) + ) + self.y_train_full = np.concatenate( + (self.y_train_full, self._ensure_column_vector([y_train])) + ) # Scaling of data with all historical data self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) self.train_y_scaler = self.train_y_scaler.fit(self.y_train_full) + # Refit scalers and model on the complete dataset self._selectBestModel(self.X_train_full, self.y_train_full) def smoothed_mape(self, y_true, y_pred, epsilon=1e-8): + """ + Calculates the smoothed Mean Absolute Percentage Error (MAPE) between true and predicted values. + + :param y_true: True values. + :param y_pred: Predicted values. + :param epsilon: Small value to avoid division by zero. + + :return: Smoothed MAPE value. + """ y_true, y_pred = np.array(y_true), np.array(y_pred) # Calculate the individual percentage errors and clip each at 100% mape = np.abs((y_true - y_pred) / (y_true + epsilon)) @@ -52,13 +143,18 @@ def smoothed_mape(self, y_true, y_pred, epsilon=1e-8): return np.mean(mape) def _selectBestModel(self, X_train, y_train): + """ + Selects the best Random Forest model based on the specified error metric. + :param X_train: Training features. + :param y_train: Training labels. + """ self.train_X_scaler = MinMaxScaler() self.train_y_scaler = MinMaxScaler() # Scale Features X_train_scaled = self.train_X_scaler.fit_transform(X_train) - y_train_scaled = self.train_y_scaler.fit_transform(y_train.reshape(-1, 1)).ravel() + y_train_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)).ravel() smoothed_mape_scorer = make_scorer(self.smoothed_mape, greater_is_better=True) @@ -86,6 +182,76 @@ def _selectBestModel(self, X_train, y_train): best_score = grid_search.best_score_ best_model = grid_search.best_estimator_ + self.model_error = best_score + self.best_params = grid_search.best_params_ + self.regressor = best_model + + # ------------------------------------------------------------------ + # Online grid-search style training used when ``use_online_grid`` is True + # ------------------------------------------------------------------ + + def _initial_online_grid(self, X_train, y_train) -> None: + self.train_X_scaler = MinMaxScaler() + self.train_y_scaler = MinMaxScaler() + + X_scaled = self.train_X_scaler.fit_transform(X_train) + y_scaled = self.train_y_scaler.fit_transform(self._ensure_column_vector(y_train)) + + scores = [] + for params in self.param_grid.keys(): + if self.param_points[params] <= self.param_cull_cutoff: + continue + model = RandomForestRegressor(random_state=42, + **{k: v for k, v in zip(self.param_order, params)}) + model.fit(X_scaled, y_scaled.ravel()) + self.param_grid[params] = model + scores.append((params, model, model.score(X_scaled, y_scaled.ravel()))) + + scores.sort(key=lambda x: x[2]) + + for params, _, _ in scores[:min(int(round(len(scores) * self.param_cull_minus_percent)) + 1, + max(0, len(scores) - self.save_top_n))]: + self.param_points[params] -= 1 + + for params, _, _ in scores[min(int(round(len(scores) * (1 - self.param_cull_plus_percent))) + 1, + max(0, len(scores) - self.save_top_n)):]: + self.param_points[params] += 1 + + _, best_model, best_score = scores[-1] + self.model_error = best_score + self.regressor = best_model + helper.log_if_verbose(f"Best Score for RandomForest: {best_score}") + + def _update_online_grid(self, X_train: np.ndarray, y_train: float) -> None: + self.X_train_full = np.concatenate((self.X_train_full, [X_train])) + self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) + + self.train_X_scaler = self.train_X_scaler.fit(self.X_train_full) + self.train_y_scaler = self.train_y_scaler.fit(self.y_train_full) + + transformed_X = self.train_X_scaler.transform(self.X_train_full) + transformed_y = self.train_y_scaler.transform(self.y_train_full).ravel() + + scores = [] + for params, model in self.param_grid.items(): + if self.param_points[params] <= self.param_cull_cutoff: + continue + model.fit(transformed_X, transformed_y) + self.param_grid[params] = model + scores.append((params, model, model.score(transformed_X, transformed_y))) + + scores.sort(key=lambda x: x[2]) + + for params, _, _ in scores[:min(int(round(len(scores) * self.param_cull_minus_percent)) + 1, + max(0, len(scores) - self.save_top_n))]: + self.param_points[params] -= 1 + + for params, _, _ in scores[min(int(round(len(scores) * (1 - self.param_cull_plus_percent))) + 1, + max(0, len(scores) - self.save_top_n)):]: + self.param_points[params] += 1 + + _, best_model, best_score = scores[-1] self.model_error = best_score self.regressor = best_model + helper.log_if_verbose(f"Best Score for RandomForest: {best_score}") diff --git a/approach/sizing_tasks.py b/approach/sizing_tasks.py index 37067b6..5fa3c10 100644 --- a/approach/sizing_tasks.py +++ b/approach/sizing_tasks.py @@ -20,6 +20,32 @@ class Sizey(PredictionMethod): + """ + Sizey is a class that implements a memory prediction method using multiple predictors. + + This class extends the `PredictionMethod` base class and provides functionality for predicting memory usage + based on various machine learning models, including linear regression, neural networks, random forests, and KNN. + It supports strategies for handling underprediction, calculating offsets, and dynamically selecting the best + prediction model. + + Attributes: + pred_err_lin (list): List of prediction errors for the linear predictor. + pred_err_nn (list): List of prediction errors for the neural network predictor. + pred_err_rf (list): List of prediction errors for the random forest predictor. + pred_err_knn (list): List of prediction errors for the KNN predictor. + lin_counter (int): Counter for the number of times the linear predictor was used. + nn_counter (int): Counter for the number of times the neural network predictor was used. + rf_counter (int): Counter for the number of times the random forest predictor was used. + knn_counter (int): Counter for the number of times the KNN predictor was used. + max_counter (int): Counter for the number of times the maximum strategy was used. + softmax_counter (int): Counter for the number of times the softmax strategy was used. + max_mem (float): Maximum memory observed during training. + max_input_size (float): Input size corresponding to the maximum memory observed. + kedall_corr (float): Kendall correlation coefficient between input size and memory usage. + failures (list): List of failure cases encountered during prediction. + actualPredictor (object): The currently selected predictor model. + """ + pred_err_lin = [] pred_err_nn = [] pred_err_rf = [] @@ -42,23 +68,42 @@ class Sizey(PredictionMethod): # Initialize Predictors def __init__(self, X_train, y_train, alpha: float, offset_strategy: OFFSET_STRATEGY, default_offset: float, - error_strategy: ERROR_STRATEGY, use_softmax: bool, error_metric: str): - self.linearPredictor = LinearPredictor(workflow_name="Test", task_name="Test", err_metr=error_metric) + error_strategy: ERROR_STRATEGY, use_softmax: bool, error_metric: str, + batch_size: int = 1, retrain_interval: int | None = None, use_online_grid: bool = False): + """ + Initializes the Sizey class with training data and parameters. + + :param X_train: Training features. + :param y_train: Training labels. + :param alpha: Weighting factor for the RAQ score. + :param offset_strategy: Strategy for calculating the offset. + :param default_offset: Default offset value. + :param error_strategy: Strategy for handling underprediction errors. + :param use_softmax: Whether to use softmax for prediction. + :param error_metric: Metric to evaluate prediction errors. + :param batch_size: Number of samples collected before performing an + incremental model update. + :param retrain_interval: After how many mini-batch updates a full + re-training of the linear and neural network models should be + triggered. ``None`` or ``0`` disables the periodic refresh. + """ + self.linearPredictor = LinearPredictor(workflow_name="Test", task_name="Test", err_metr=error_metric, + use_online_grid=use_online_grid) self.neuralNetworkPredictor = NeuralNetworkPredictor(workflow_name="Test", task_name="Test", - err_metr=error_metric) + err_metr=error_metric, use_online_grid=use_online_grid) self.randomForestPredictor = RandomForestPredictor(workflow_name="Test", task_name="Test", - err_metr=error_metric) - self.knnPredictor = KNNPredictor(workflow_name="Test", task_name="Test", err_metr=error_metric) + err_metr=error_metric, use_online_grid=use_online_grid) + self.knnPredictor = KNNPredictor(workflow_name="Test", task_name="Test", + err_metr=error_metric, use_online_grid=use_online_grid) + y_train = np.asarray(y_train).ravel() self._initial_model_training(X_train, y_train) self.alpha = alpha self.offset_strategy = offset_strategy self.default_offset = default_offset self.error_strategy = error_strategy - self.max_mem = max(y_train)[0] - print(y_train) - self.max_input_size = X_train.values[np.where(y_train == self.max_mem)[0][0]][0] - print(self.max_input_size) - self.min_mem = min(y_train)[0] + self.max_mem = np.max(y_train) + self.max_input_size = X_train.values[np.argmax(y_train)][0] + self.min_mem = np.min(y_train) self.X_full = X_train.values self.y_full = y_train self.kedall_corr = stats.kendalltau(self.X_full, self.y_full) @@ -67,6 +112,12 @@ def __init__(self, X_train, y_train, alpha: float, offset_strategy: OFFSET_STRAT # Initial Training def _initial_model_training(self, X_train, y_train) -> None: + """ + Initializes the models with training data. + + :param X_train: Training features. + :param y_train: Training labels. + """ self.linearPredictor.initial_model_training(X_train, y_train) self.neuralNetworkPredictor.initial_model_training(X_train, y_train) self.randomForestPredictor.initial_model_training(X_train, y_train) @@ -74,7 +125,15 @@ def _initial_model_training(self, X_train, y_train) -> None: # RAQ for final prediction def predict(self, X_test: pd.Series, y_test: int, user_estimate) -> (float, float): + """ + Predicts the output for given test data and user estimate using RAQ strategy. + + :param X_test: Test features. + :param y_test: Test label. + :param user_estimate: User's estimate for the task. + :return: Tuple containing the predicted memory and raw prediction. + """ self.X_full = np.concatenate((self.X_full, X_test.values.reshape(-1, 1))) self.y_full = np.append(self.y_full, y_test) self.kedall_corr = stats.kendalltau(self.X_full, self.y_full) @@ -100,7 +159,7 @@ def predict(self, X_test: pd.Series, y_test: int, user_estimate) -> (float, floa self.pred_err_rf.append(((y_test - prediction_rf) / y_test).flatten()[0]) if abs(((y_test - prediction_rf) / y_test).flatten()[0]) > 0.7: - print("Jump into debug") + logging.debug("Jump into debug") offset_knn = self._get_offset(self.offset_strategy, self.default_offset, self.pred_err_knn, self.knnPredictor) prediction_knn = self.knnPredictor.predict_task(X_test) @@ -117,7 +176,6 @@ def predict(self, X_test: pd.Series, y_test: int, user_estimate) -> (float, floa beta * raq_nn) / sum_raq_softmax) + offset_rf * (np.exp( beta * raq_rf) / sum_raq_softmax )+ offset_knn * (np.exp(beta * raq_knn) / sum_raq_softmax) - print(y_pred_softmax) memToPredict = -1 raw_prediction = -1 @@ -160,6 +218,12 @@ def predict(self, X_test: pd.Series, y_test: int, user_estimate) -> (float, floa return memToPredict, raw_prediction def update_model(self, X_train: pd.Series, y_train: float) -> None: + """ + Updates the model with new training data by appending it to the historical data and retraining the models. + + :param X_train: New training features. + :param y_train: New training labels. + """ if y_train > self.max_mem: self.max_mem = y_train self.max_input_size = X_train[0] @@ -169,11 +233,21 @@ def update_model(self, X_train: pd.Series, y_train: float) -> None: self.linearPredictor.update_model(X_train, y_train) self.neuralNetworkPredictor.update_model(X_train, y_train) self.randomForestPredictor.update_model(X_train, y_train) - self.neuralNetworkPredictor.update_model(X_train, y_train) + self.knnPredictor.update_model(X_train, y_train) def handle_underprediction(self, input_size: float, predicted: float, user_estimate: float, retry_number: int, actual_memory: float) -> float: + """ + Handles underprediction by calculating the next prediction based on the error strategy. + :param input_size: Size of the input task. + :param predicted: Predicted memory for the task. + :param user_estimate: User's estimate for the task. + :param retry_number: Number of retries attempted. + :param actual_memory: Actual memory used for the task. + + :return: Next predicted memory based on the error strategy. + """ next_pred = self._get_next_pred_for_underpred(input_size, predicted, user_estimate) if next_pred > actual_memory: @@ -185,10 +259,15 @@ def handle_underprediction(self, input_size: float, predicted: float, user_estim "original": predicted, "peak_memory": -1, "retry_strategy": self.error_strategy, "retry_number": retry_number}) - print("Failures" + str(self.failures)) + logging.debug("Failures" + str(self.failures)) return next_pred def _calculate_accuracy_score(self) -> Tuple[float, float, float, float]: + """ + Calculates the accuracy score for each predictor. + + :return: Tuple containing accuracy scores for linear, neural network, random forest, and KNN predictors. + """ accuracy_lin = self.linearPredictor.model_error accuracy_nn = self.neuralNetworkPredictor.model_error accuracy_rf = self.randomForestPredictor.model_error @@ -197,6 +276,13 @@ def _calculate_accuracy_score(self) -> Tuple[float, float, float, float]: return accuracy_lin, accuracy_nn, accuracy_rf, accuracy_knn def _calculate_efficiency_score(self, X_test) -> Tuple[float, float, float, float]: + """ + Calculates the efficiency score for each predictor based on their predictions. + + :param X_test: Test features for which the efficiency scores are calculated. + + :return: Tuple containing efficiency scores for linear, neural network, random forest, and KNN predictors. + """ pred_lin = self.linearPredictor.predict_task(X_test) pred_nn = self.neuralNetworkPredictor.predict_task(X_test) pred_rf = self.randomForestPredictor.predict_task(X_test) @@ -212,6 +298,14 @@ def _calculate_efficiency_score(self, X_test) -> Tuple[float, float, float, floa return efficiency_lin, efficiency_nn, efficiency_rf, efficiency_knn def _calculate_raq_score(self, X_test, y_test) -> Tuple[float, float, float, float]: + """ + Calculates the RAQ score for each predictor based on their accuracy and efficiency scores. + + :param X_test: Test features for which the RAQ scores are calculated. + :param y_test: Test label for which the RAQ scores are calculated. + + :return: Tuple containing RAQ scores for linear, neural network, random forest, and KNN predictors. + """ accuracy_lin, accuracy_nn, accuracy_rf, accuracy_knn = self._calculate_accuracy_score() efficiency_lin, efficiency_nn, efficiency_rf, efficiency_knn = self._calculate_efficiency_score(X_test) @@ -223,6 +317,13 @@ def _calculate_raq_score(self, X_test, y_test) -> Tuple[float, float, float, flo return raq_lin, raq_nn, raq_rf, raq_knn def _check_gt0(self, prediction) -> bool: + """ + Checks if the prediction is greater than zero. + + :param prediction: The prediction value to check. + + :return: True if the prediction is greater than zero, False otherwise. + """ if prediction > 0: return True @@ -230,7 +331,16 @@ def _check_gt0(self, prediction) -> bool: def _get_offset(self, offset_strategy: OFFSET_STRATEGY, default_offset: float, prediction_error: list, predictor) -> float: + """ + Calculates the offset based on the specified strategy and prediction errors. + + :param offset_strategy: Strategy for calculating the offset. + :param default_offset: Default offset value to return if no other strategy applies. + :param prediction_error: List of prediction errors to analyze. + :param predictor: The predictor object used for making predictions. + :return: Calculated offset based on the strategy. + """ if len(prediction_error) < 1: return default_offset @@ -280,17 +390,21 @@ def _get_offset(self, offset_strategy: OFFSET_STRATEGY, default_offset: float, p raise NotImplementedError('Offset strategy ' + str(offset_strategy) + ' not found.') def _get_next_pred_for_underpred(self, input_size: float, prediction: float, user_estimate: float) -> float: + """ + Determines the next prediction value based on the error strategy when an underprediction occurs. - print(input_size) - print(self.max_input_size) + :param input_size: Size of the input task. + :param prediction: Predicted memory for the task. + :param user_estimate: User's estimate for the task. + :return: Adjusted prediction value based on the error strategy. + """ if self.error_strategy == ERROR_STRATEGY.DOUBLE: return prediction * 2 elif self.error_strategy == ERROR_STRATEGY.MAX_EVER_OBSERVED: if self.max_mem <= prediction: return prediction * 2 elif (self.kedall_corr.correlation > 0.25) & (input_size > self.max_input_size): - print("Kicked in") return prediction * 2 elif self.max_mem < prediction * 1.05: return prediction * 2 @@ -302,7 +416,14 @@ def _get_next_pred_for_underpred(self, input_size: float, prediction: float, use raise NotImplementedError('Underprediction strategy not found') def _select_dynamic_offset_failures(self, prediction_error, predictor): + """ + Selects the best offset strategy based on the number of failures in predictions. + :param prediction_error: List of prediction errors to analyze. + :param predictor: The predictor object used for making predictions. + + :return: Calculated offset based on the strategy that minimizes failures. + """ min_offset_strat = None min_failures = sys.maxsize @@ -318,11 +439,16 @@ def _select_dynamic_offset_failures(self, prediction_error, predictor): min_failures = failures min_offset_strat = offset_strat - print(min_offset_strat.name) return self._get_offset(min_offset_strat, 0.05, prediction_error, predictor) def next_or_same_power_of_two(self, n): + """ + Returns the next power of two greater than or equal to n. + + :param n: The number to find the next power of two for. + :return: The next power of two greater than or equal to n. + """ n = int(np.ceil(n)) if n < 0: @@ -338,7 +464,14 @@ def next_or_same_power_of_two(self, n): return power def _select_dynamic_offset_wastage(self, prediction_error, predictor): + """ + Selects the best offset strategy based on minimizing wastage in predictions. + :param prediction_error: List of prediction errors to analyze. + :param predictor: The predictor object used for making predictions. + + :return: Calculated offset based on the strategy that minimizes wastage. + """ min_offset_strat = None min_wastage = sys.maxsize @@ -368,9 +501,13 @@ def _select_dynamic_offset_wastage(self, prediction_error, predictor): min_wastage = wastage min_offset_strat = offset_strat - print(min_offset_strat.name) return self._get_offset(min_offset_strat, 0.05, prediction_error, predictor) def get_number_subModels(self) -> dict[str, int]: + """ + Returns the count of each sub-model used in the prediction process. + + :return: Dictionary containing the count of each sub-model. + """ return {"lin": self.lin_counter, "nn": self.nn_counter, "rf": self.rf_counter, "knn": self.knn_counter, "max": self.max_counter, "softmax": self.softmax_counter} diff --git a/baselines/tovar.py b/baselines/tovar.py index 1688332..32ab3bb 100644 --- a/baselines/tovar.py +++ b/baselines/tovar.py @@ -17,8 +17,29 @@ class TovarPredictor(PredictionMethod): + """ + TovarPredictor is a class that implements a resource allocation prediction model. + + This class extends the `PredictionMethod` base class and provides methods for training, predicting, + and updating the model based on historical data. It uses a histogram-based approach to calculate + allocations and optimize resource usage. + + Attributes: + value_resolution (int): Resolution for the value (e.g., memory in bytes). + time_resolution (int): Resolution for the time (e.g., seconds). + maximum (Optional[int]): Maximum value seen in the accumulated data points. + values (list): List of peak resource usage values. + times (list): List of job durations. + histogram (dict): Nested dictionary storing the frequency of value-time pairs. + """ def __init__(self, value_resolution=1, time_resolution=1): + """ + Initialize the Tovar predictor with specified value and time resolution. + + :param value_resolution: Resolution for the value (e.g., memory in bytes). + :param time_resolution: Resolution for the time (e.g., seconds). + """ self.value_resolution = value_resolution self.time_resolution = time_resolution @@ -31,13 +52,41 @@ def __init__(self, value_resolution=1, time_resolution=1): # Instead of x= input_size and y=peak_mem we treat x=peak_mem and y=runtimes def initial_model_training(self, memory_peaks, runtimes) -> None: + """ + Initializes the model with training data. + + :param memory_peaks: A pandas Series containing memory peaks (e.g., memory usage in bytes). + :param runtimes: A pandas Series containing runtimes (e.g., execution time in seconds). + """ for x in range(len(memory_peaks.values)): self.add_data_point(memory_peaks.values[x], runtimes.values[x]) def predict(self, X_test, y_test, user_estimate): + """ + Predicts the first allocation based on the accumulated data. + + :param X_test: Test features (not used in this implementation). + :param y_test: Test labels (not used in this implementation). + :param user_estimate: User's estimate for the allocation (not used in this implementation). + + :return: A tuple containing the first allocation for 'throughput' and 'waste' modes. + """ return self.first_allocation(mode='waste'), self.first_allocation(mode='waste') def handle_underprediction(self, input_size:float, predicted: float, user_estimate:float, retry_number: int, actual_memory: float): + """ + Handles underprediction scenarios by adjusting the predicted value based on the maximum seen. + If the predicted value is less than the maximum seen, it returns the maximum seen. + Otherwise, it returns a fixed value of 128GB memory. + + :param input_size: Size of the input task (not used in this implementation). + :param predicted: Predicted value from the model (e.g., memory usage in bytes). + :param user_estimate: User's estimate for the task (not used in this implementation). + :param retry_number: Number of retries attempted (not used in this implementation). + :param actual_memory: Actual memory used for the task (not used in this implementation). + + :return: Adjusted predicted value based on the maximum seen. + """ if predicted < self.maximum_seen: return self.maximum_seen else: @@ -46,6 +95,12 @@ def handle_underprediction(self, input_size:float, predicted: float, user_estima # underpred. aufrufen def update_model(self, memory_peak: int, runtime: int) -> None: + """ + Updates the model with new training data by adding a new data point. + + :param memory_peak: Peak resource usage of a job (e.g., memory usage in bytes). + :param runtime: Duration of the job (e.g., execution time in seconds). + """ self.add_data_point(memory_peak,runtime) def get_number_subModels(self) -> dict[str, int]: @@ -61,6 +116,15 @@ def get_number_subModels(self) -> dict[str, int]: # @endcode @property def count(self): + """ + Returns the number of data points added to the model. + + This property calculates the total number of values stored in the `values` list, + which represents the peak resource usage data points accumulated during the model's operation. + + :return: The number of data points in the `values` list. + :rtype: int + """ return len(self.values) ## @@ -71,20 +135,32 @@ def count(self): # @endcode @property def maximum_seen(self): + """ + Returns the maximum value seen in the accumulated data points. + + This property retrieves the highest peak resource usage value stored in the `maximum` attribute, + which is updated during the addition of data points. + + :return: The maximum peak resource usage value. + :rtype: int or None + """ return self.maximum - ## - # Add a data point. - # @param self Reference to the current object. - # @param value Peak resource usage of a job. - # @param time Duration of the job. - # - # Units should be consistent across data points. - # - # @code - # print fa.add_data_point(value = 50, time = 360) - # @endcode def add_data_point(self, value, time): + """ + Adds a data point to the model. + + This method updates the `values` and `times` lists with the provided data point + and organizes the data into buckets based on the specified resolutions. It also + updates the `maximum` attribute and the `histogram` dictionary to reflect the new data. + + Units should be consistent across data points. + + :param value: Peak resource usage of a job. + :param time: Duration of the job. + + :return: The updated count of occurrences for the given value-time bucket. + """ self.values.append(value) self.times.append(time) @@ -104,16 +180,18 @@ def add_data_point(self, value, time): return self.histogram[value_bucket][time_bucket] - ## - # Compute and return the first allocation. - # - # @param self Reference to the current object. - # @param mode Optimization mode. One of 'throughput', 'waste', or 'fixed'. - # - # @code - # v = fa.first_allocation(mode = 'throughput') - # @endcode def first_allocation(self, mode='throughput'): + """ + Computes and returns the first allocation based on the optimization mode. + + This method calculates the first allocation using one of three modes: 'throughput', + 'waste', or 'fixed'. The allocation is determined based on the accumulated data points + and the selected optimization strategy. + + :param mode: Optimization mode. One of 'throughput', 'waste', or 'fixed'. + + :return: The computed allocation value based on the selected mode. + """ valid_modes = ['throughput', 'waste', 'fixed'] if mode == 'fixed': @@ -125,13 +203,18 @@ def first_allocation(self, mode='throughput'): else: raise ValueError('mode not one of %s', ','.join(valid_modes)) - ## - # Return the waste (unit x time) that would be produced if the accumulated - # values were run under the given allocation. - # - # @param self Reference to the current object. - # @param allocation Value of allocation to test. def waste(self, allocation): + """ + Calculates the waste produced under a given allocation. + + This method computes the total waste (unit x time) that would result if the accumulated + values were run under the specified allocation. Waste is calculated based on the difference + between the allocation and the actual resource usage. + + :param allocation: Value of allocation to test. + + :return: The total waste produced under the given allocation. + """ waste = 0 for i in range(0, len(self.values)): v = self.values[i] @@ -143,12 +226,15 @@ def waste(self, allocation): waste += t * (allocation + self.maximum_seen - v) return waste - ## - # Return the usage (unit x time) if the accumulated values were run under - # the given allocation. - # - # @param self Reference to the current object. def usage(self): + """ + Calculates the total resource usage under the accumulated data points. + + This method computes the total usage (unit x time) based on the accumulated + values and times stored in the model. + + :return: The total resource usage (unit x time). + """ usage = 0 for i in range(0, len(self.values)): v = self.values[i] @@ -156,25 +242,34 @@ def usage(self): usage += t * v return usage - ## - # Return the percentage of wasted resources that would be produced if the accumulated - # values were run under the given allocation. - # - # @param self Reference to the current object. - # @param allocation Value of allocation to test. def wastepercentage(self, allocation): + """ + Calculates the percentage of wasted resources under a given allocation. + + This method computes the percentage of wasted resources based on the total waste + and usage for the accumulated data points under the specified allocation. + + :param allocation: Value of allocation to test. + + :return: The percentage of wasted resources. + """ waste = self.waste(allocation) usage = self.usage() return (100.0 * waste) / (waste + usage) - ## - # Return the throughput of a single node if the accumulated values values - # were run under the given allocation. Assumes an infinite number of tasks. - # - # @param self Reference to the current object. - # @param allocation Value of allocation to test. def throughput(self, allocation): + """ + Calculates the throughput of a single node under a given allocation. + + This method computes the throughput (tasks per unit time) for a single node + based on the accumulated data points and the specified allocation. It assumes + an infinite number of tasks. + + :param allocation: Value of allocation to test. + + :return: The throughput of a single node under the given allocation. + """ maximum = float(self.maximum_seen) tasks = 0 @@ -193,13 +288,18 @@ def throughput(self, allocation): return tasks / total_time - ## - # Return the number of tasks that would be retried if the accumulated - # values were run under the given allocation. - # - # @param self Reference to the current object. - # @param allocation Value of allocation to test. def retries(self, allocation): + """ + Calculates the number of tasks that would be retried under a given allocation. + + This method computes the number of tasks that would require retries if the accumulated + values were run under the specified allocation. A retry is required if the resource usage + exceeds the allocation. + + :param allocation: Value of allocation to test. + + :return: The number of tasks that would be retried. + """ retries = 0 for v in self.values: if v > allocation: @@ -207,6 +307,15 @@ def retries(self, allocation): return retries def __first_allocation_by_waste(self): + """ + Computes the first allocation based on the waste optimization strategy. + + This private method calculates the first allocation by minimizing the expected waste + using the accumulated data points. It uses a mathematical model to determine the optimal + allocation value. + + :return: The computed allocation value based on the waste optimization strategy. + """ values = self.histogram.keys() # computation below is easier if values are sorted in reversed. @@ -242,6 +351,15 @@ def __first_allocation_by_waste(self): return a def __first_allocation_by_throughput(self): + """ + Computes the first allocation based on the throughput optimization strategy. + + This private method calculates the first allocation by maximizing the expected throughput + using the accumulated data points. It uses a mathematical model to determine the optimal + allocation value. + + :return: The computed allocation value based on the throughput optimization strategy. + """ values = self.histogram.keys() # computation below is easier if values are sorted in reversed. @@ -286,12 +404,32 @@ def __first_allocation_by_throughput(self): return a def __count_of_value(self, value): + """ + Counts the occurrences of a specific value in the histogram. + + This private method calculates the total count of occurrences for a given value + across all time buckets in the histogram. + + :param value: The value to count occurrences for. + + :return: The total count of occurrences for the given value. + """ count = 0 for time in self.histogram[value].keys(): count += self.histogram[value][time] return count def __accum_times_per_value(self, value): + """ + Accumulates the total time for a specific value in the histogram. + + This private method calculates the total time associated with a given value + across all time buckets in the histogram. + + :param value: The value to accumulate time for. + + :return: The total accumulated time for the given value. + """ total_time = 0 for time in self.histogram[value].keys(): count = self.histogram[value][time] diff --git a/baselines/wastage.py b/baselines/wastage.py index 05144fe..8120391 100644 --- a/baselines/wastage.py +++ b/baselines/wastage.py @@ -1,7 +1,7 @@ """ - Compute over- and undersizing wastage for different failure handling strategies for individual jobs. - The attempt sequence for each job is derived from its real memory usage, the first allocation, and the failure handling strategy. - Each wastage function returns the over-/undersizing wastage of the entire attempt sequence of a job. +Compute over- and undersizing wastage for different failure handling strategies for individual jobs. +The attempt sequence for each job is derived from its real memory usage, the first allocation, and the failure handling strategy. +Each wastage function returns the over-/undersizing wastage of the entire attempt sequence of a job. """ import math from collections import namedtuple @@ -17,21 +17,67 @@ class ModelParameters: + """ + Represents the parameters of a model, including slope, intercept, base, and quadratic terms. + + Attributes: + slope (float): The slope of the model. + intercept (float): The intercept of the model. + base (Optional[float]): The base value for the model (default is None). + quadratic (Optional[float]): The quadratic term for the model (default is None). + """ + def __init__(self, slope: float, intercept: float, base: Optional[float] = None, quadratic: Optional[float] = None): + """ + Initializes the ModelParameters object. + + :param slope: The slope of the model. + :param intercept: The intercept of the model. + :param base: The base value for the model (default is None). + :param quadratic: The quadratic term for the model (default is None). + """ self.slope = slope self.intercept = intercept self.base = base self.quadratic = quadratic def __str__(self): + """ + Returns a string representation of the model parameters. + + :return: A formatted string containing the model parameters. + """ return "quadratic {:.2f} linear {:.2f} intercept {:.2f} base {:.2f}".format( self.quadratic if self.quadratic is not None else np.nan, self.slope, self.intercept, self.base if self.base is not None else np.nan) class Wastage: + """ + Represents wastage metrics for resource allocation. + + Attributes: + usage (float): Total resource usage. + oversizing (float): Amount of oversizing wastage. + undersizing (float): Amount of undersizing wastage. + failures (int): Number of failed attempts. + wastage_GB (float): Total wastage in gigabytes. + wastage_GBh (float): Total wastage in gigabyte-hours. + runtime_h (float): Total runtime in hours. + """ + def __init__(self, usage: float, oversizing: float, undersizing: float, failures: int, wastage_GB: float, runtime_h: float): + """ + Initializes the Wastage object. + + :param usage: Total resource usage. + :param oversizing: Amount of oversizing wastage. + :param undersizing: Amount of undersizing wastage. + :param failures: Number of failed attempts. + :param wastage_GB: Total wastage in gigabytes. + :param runtime_h: Total runtime in hours. + """ assert usage >= 0, "Usage must be > 0, is {}".format(usage) self.usage = usage self.oversizing = oversizing @@ -43,9 +89,19 @@ def __init__(self, usage: float, oversizing: float, undersizing: float, failures @property def maq(self): + """ + Calculates the Memory Allocation Quality (MAQ). + + :return: The MAQ value as a ratio of usage to total allocation. + """ return self.usage / (self.usage + self.oversizing + self.undersizing) def __str__(self): + """ + Returns a string representation of the wastage metrics. + + :return: A formatted string containing the wastage metrics. + """ return ("MAQ {:.2f}% oversizing {:.1f}% undersizing {:.1f}% failures {} wastage {:.2f}\n".format( self.maq * 100, self.oversizing / (self.usage + self.oversizing + self.undersizing) * 100, @@ -56,17 +112,25 @@ def __str__(self): def exponential(df: pd.DataFrame, relative_ttf: float, resource_column, first_allocation_column, run_time_column, base: float = 2, workflow: str = "default") -> "Wastage": """ - Compute the wastage under the assumption that allocated resources are multiplied by `base` after each failed attempt. - :param df: needs a column 'first_allocation_column' containing the allocated resources for the first attempt of a task - :param relative_ttf: the assumed relative time to failure in case of insufficient resources. Can be larger than 1 but has to be larger than 0. - :param base: The multiplier to apply to the resource allocation of a failed attempt. - :param resource_column: The name of the pandas dataframe column containing the resource usage values. - :param first_allocation_column: Column containing the amount of resources allocated to the first attempt of each job. - :param run_time_column: Column containing the execution duration of each job. - :return: - """ - + Computes the wastage under the assumption that allocated resources are multiplied by `base` after each failed attempt. + + :param df: DataFrame containing job data with columns for resource usage, first allocation, and runtime. + :param relative_ttf: Assumed relative time to failure in case of insufficient resources. + :param resource_column: Column name for resource usage values. + :param first_allocation_column: Column name for first allocation values. + :param run_time_column: Column name for job runtime values. + :param base: Multiplier for resource allocation after a failed attempt. + :param workflow: Workflow name (default is "default"). + :return: A Wastage object containing wastage metrics. + """ def generate_prediction_series(a, count): + """ + Generates a series of predictions based on allocation and count. + + :param a: Allocation value. + :param count: Number of predictions to generate. + :return: List of predictions. + """ lst = [] for i in range(1, int(count) + 1): lst.append(a * i) @@ -84,7 +148,7 @@ def generate_prediction_series(a, count): df["Wastage_GBh"] = oversizing + undersizing * 0.000000001 / 3600000 runtime_h = (base ** k * df[run_time_column]).sum() / 3600000.0 res = [generate_prediction_series(b, a) for a, b in zip(k + 1, df[first_allocation_column])] - res = pd.Series(res) + res = pd.Series(res) df.reset_index(drop=True, inplace=True) df["Predictions"] = res @@ -96,15 +160,18 @@ def generate_prediction_series(a, count): runtime_h=runtime_h, wastage_GB=wastage_GB_total.sum() * 0.000000001) - def failed_attempts_exponential(base: float, real_usage: float, first_allocation: float) -> int: """ - For exponential strategy only. - :param base: base for exponential increase of allocation sizes - :param real_usage: real memory usage - :param first_allocation: memory allocated to first attempt - :return: number of failed attempts before success - """ + Calculate the number of failed attempts before success using an exponential strategy. + + This function computes the number of failed attempts required to allocate sufficient resources + for a job, based on the exponential increase of allocation sizes. + + :param base: The base for exponential increase of allocation sizes. Must be greater than 1. + :param real_usage: The actual memory usage of the job. Must be greater than 0. + :param first_allocation: The memory allocated for the first attempt. Must be greater than 0. + :return: The number of failed attempts before success. + """ assert first_allocation > 0, "first_allocation = {}, must be > 0".format(first_allocation) assert base > 1, "base = {}, must be > 1".format(base) @@ -113,12 +180,17 @@ def failed_attempts_exponential(base: float, real_usage: float, first_allocation def oversizing_wastage_exponential(real_usage: float, run_time: float, first_allocation: float, base: float) -> float: """ - :param real_usage: true resource usage - :param run_time: job runtime - :param allocation: the first attempt - :param the base for increment - :return: - """ + Calculate the oversizing wastage for a job using an exponential strategy. + + This function computes the wastage caused by allocating more resources than required + during the job's runtime, based on the exponential increase of allocation sizes. + + :param real_usage: The actual memory usage of the job. Must be greater than 0. + :param run_time: The runtime of the job. Must be greater than 0. + :param first_allocation: The memory allocated for the first attempt. Must be greater than 0. + :param base: The base for exponential increase of allocation sizes. Must be greater than 1. + :return: The oversizing wastage (memory x time). + """ assert first_allocation > 0, "first_allocation = {}, must be > 0".format(first_allocation) assert run_time > 0, "run_time = {}, must be > 0".format(run_time) assert real_usage > 0, "real_usage = {}, must be > 0".format(real_usage) @@ -131,12 +203,17 @@ def oversizing_wastage_exponential(real_usage: float, run_time: float, first_all def undersizing_wastage_exponential(real_usage: float, abs_ttf: float, first_allocation: float, base: float) -> ( float, int): """ - :param real_usage: true memory usage - :param abs_ttf: job execution time (time to failure) when executed with insufficient resources - :param first_allocation: the first attempt - :param base: the base for exponential increase of allocation sizes - :return: - """ + Calculate the undersizing wastage and the number of failed attempts for a job using an exponential strategy. + + This function computes the wastage caused by allocating insufficient resources during the job's runtime, + as well as the number of failed attempts required to allocate sufficient resources. + + :param real_usage: The actual memory usage of the job. Must be greater than 0. + :param abs_ttf: The absolute time to failure when executed with insufficient resources. Must be greater than 0. + :param first_allocation: The memory allocated for the first attempt. Must be greater than 0. + :param base: The base for exponential increase of allocation sizes. Must be greater than 1. + :return: A tuple containing the undersizing wastage (memory x time) and the number of failed attempts. + """ assert first_allocation > 0 assert abs_ttf > 0 assert real_usage > 0 @@ -149,22 +226,35 @@ def undersizing_wastage_exponential(real_usage: float, abs_ttf: float, first_all def oversizing_wastage_2step(real_usage: float, run_time: float, first_allocation: float, max_allocation: float) -> float: """ - This can be used when the maximum is known. Shouldn't be used except for demonstration purposes (Regression example figure in the paper, see TovarWastageFigure.py) - :param real_usage: real memory usage - :param first_allocation: memory allocated to first attempt - :param max_allocation: memory allocated to final attempt - """ + Calculate the oversizing wastage for a job using a two-step strategy. + + This function computes the wastage caused by allocating more resources than required + during the job's runtime. It uses a two-step strategy where the allocation is either + the first attempt or the maximum allocation. + + :param real_usage: The actual memory usage of the job. Must be greater than 0. + :param run_time: The runtime of the job. Must be greater than 0. + :param first_allocation: The memory allocated for the first attempt. Must be greater than 0. + :param max_allocation: The memory allocated for the final attempt. Must be greater than 0. + :return: The oversizing wastage (memory x time). + """ allocation = first_allocation if first_allocation >= real_usage else max_allocation return (allocation - real_usage) * run_time def undersizing_wastage_2step(real_usage: float, abs_ttf: float, first_allocation: float) -> float: """ - This can be used when the maximum is known. Shouldn't be used except for demonstration purposes (Regression example figure in the paper, see TovarWastageFigure.py) - :param real_usage: real memory usage - :param first_allocation: memory allocated to first attempt - :param abs_ttf: job execution time (time to failure) when executed with insufficient resources - """ + Calculate the undersizing wastage for a job using a two-step strategy. + + This function computes the wastage caused by allocating insufficient resources + during the job's runtime. It uses a two-step strategy where the allocation is + compared to the actual memory usage. + + :param real_usage: The actual memory usage of the job. Must be greater than 0. + :param abs_ttf: The absolute time to failure when executed with insufficient resources. Must be greater than 0. + :param first_allocation: The memory allocated for the first attempt. Must be greater than 0. + :return: The undersizing wastage (memory x time). + """ if first_allocation >= real_usage: return 0 @@ -175,17 +265,22 @@ def wastage_3step(df: pd.DataFrame, max_seen_so_far: float, max_available: float eps: float = 1e-4, resource_column='rss', first_allocation_column='first_allocation', run_time_column='run_time') -> Wastage: """ - Computes wastage with maximum failure handling strategy by summing up wastage for every job in a set - :param df: data frame containing the actual resource usage, execution duration, etc. of tasks - :param max_seen_so_far: - :param max_available: - :param relative_ttf: - :param eps: tolerance when comparing allocations - :param resource_column: the name of the column to read the actual resource usage from - :param first_allocation_column: column that contains the first allocation - :param run_time_column: column that contains the execution duration - :return: - """ + Compute wastage using a three-step failure handling strategy. + + This function calculates the oversizing and undersizing wastage for a set of jobs + based on their resource usage, execution duration, and allocation strategy. It uses + a three-step strategy where allocations are adjusted after each failure. + + :param df: DataFrame containing job data with columns for resource usage, first allocation, and runtime. + :param max_seen_so_far: The maximum allocation observed so far. + :param max_available: The maximum allocation available. + :param relative_ttf: The relative time to failure in case of insufficient resources. + :param eps: Tolerance when comparing allocations (default is 1e-4). + :param resource_column: Column name for resource usage values (default is 'rss'). + :param first_allocation_column: Column name for first allocation values (default is 'first_allocation'). + :param run_time_column: Column name for job runtime values (default is 'run_time'). + :return: A Wastage object containing wastage metrics. + """ assert len(df) > 0 success_on_first_attempt = df[resource_column] <= df[first_allocation_column] + eps @@ -236,12 +331,20 @@ def wastage_3step(df: pd.DataFrame, max_seen_so_far: float, max_available: float def wastage_exponential(df: pd.DataFrame, relative_ttf: float, base: float, resource_column='rss', first_allocation_column='first_allocation', run_time_column='run_time') -> Wastage: """ - Compute the wastage under the assumption that after each failure another attempt is tried with base*previous allocation - :param df: needs a column 'first_allocation_column' containing the allocated memory for the first attempt of a task - :param relative_ttf: - :param base: - :return: - """ + Compute the wastage under the assumption that after each failure another attempt is tried with base * previous allocation. + + This function calculates the oversizing and undersizing wastage for a set of jobs based on their resource usage, + execution duration, and allocation strategy. It uses an exponential strategy where allocations are adjusted + after each failure. + + :param df: DataFrame containing job data with columns for resource usage, first allocation, and runtime. + :param relative_ttf: Relative time to failure in case of insufficient resources. + :param base: Multiplier for resource allocation after a failed attempt. Must be greater than 1. + :param resource_column: Column name for resource usage values (default is 'rss'). + :param first_allocation_column: Column name for first allocation values (default is 'first_allocation'). + :param run_time_column: Column name for job runtime values (default is 'run_time'). + :return: A Wastage object containing wastage metrics. + """ assert len(df) > 0 k = np.clip(np.ceil(np.log(df[resource_column] / df[first_allocation_column]) / np.log(base)), a_min=0, a_max=None) @@ -256,12 +359,19 @@ def wastage_exponential(df: pd.DataFrame, relative_ttf: float, base: float, reso def wastage_exponential_prop_ttf(df: pd.DataFrame, base: float, resource_column='rss', first_allocation_column='first_allocation', run_time_column='run_time') -> Wastage: """ - Like wastage_exponential, but assume that the time to failure is proportional to the prediction error. - E.g., when allocting 1 GB to a 10 GB task, time to failure is 1/10, whereas for 9 GB it's 0.9. - :param df: needs a column 'first_allocation_column' containing the allocated memory for the first attempt of a task - :param base: - :return: - """ + Compute wastage under the assumption that the time to failure is proportional to the prediction error. + + This function calculates the oversizing and undersizing wastage for a set of jobs based on their resource usage, + execution duration, and allocation strategy. It assumes that the time to failure is proportional to the error + in resource allocation. + + :param df: DataFrame containing job data with columns for resource usage, first allocation, and runtime. + :param base: Multiplier for resource allocation after a failed attempt. Must be greater than 1. + :param resource_column: Column name for resource usage values (default is 'rss'). + :param first_allocation_column: Column name for first allocation values (default is 'first_allocation'). + :param run_time_column: Column name for job runtime values (default is 'run_time'). + :return: A Wastage object containing wastage metrics. + """ assert len(df) > 0 k = np.clip(np.ceil(np.log(df[resource_column] / df[first_allocation_column]) / np.log(base)), a_min=0, a_max=None) @@ -275,6 +385,17 @@ def wastage_exponential_prop_ttf(df: pd.DataFrame, base: float, resource_column= def wastage_simple(df: pd.DataFrame, resource_column='rss', first_allocation_column='first_allocation'): + """ + Compute wastage using a simple failure handling strategy. + + This function calculates the oversizing and undersizing wastage for a set of jobs based on their resource usage + and allocation strategy. It uses a simple strategy where allocations are compared directly to resource usage. + + :param df: DataFrame containing job data with columns for resource usage and first allocation. + :param resource_column: Column name for resource usage values (default is 'rss'). + :param first_allocation_column: Column name for first allocation values (default is 'first_allocation'). + :return: A Wastage object containing wastage metrics. + """ # todo this can be simplified. undersizing = np.select([ df[first_allocation_column] < df[resource_column], diff --git a/baselines/witt_feedback_based.py b/baselines/witt_feedback_based.py index cb2bed2..441ee8b 100644 --- a/baselines/witt_feedback_based.py +++ b/baselines/witt_feedback_based.py @@ -10,34 +10,99 @@ class WittPercentilePredictor(PredictionMethod): + """ + A prediction method that uses the 95th percentile of training data for predictions. + + Attributes: + percentile (float): The percentile value used for predictions (default is 0.95). + y_train_full (Optional[np.ndarray]): Full training target values. + X_train_full (Optional[np.ndarray]): Full training feature values. + """ + percentile = 0.95 def __init__(self): + """ + Initializes the WittPercentilePredictor object with empty training data. + """ self.y_train_full = None self.X_train_full = None def initial_model_training(self, X_train, y_train) -> None: + """ + Stores the initial training data. + + :param X_train: Training feature data. + :param y_train: Training target data. + """ self.X_train_full = X_train self.y_train_full = y_train def predict(self, X_test, y_test, user_estimate): + """ + Predicts the 95th percentile value of the training target data. + + :param X_test: Test feature data (unused in this method). + :param y_test: Test target data (unused in this method). + :param user_estimate: User-provided estimate (unused in this method). + :return: A tuple containing the 95th percentile value twice. + """ return np.percentile(self.y_train_full, q=95), np.percentile(self.y_train_full, q=95) - def handle_underprediction(self, input_size:float, predicted: float, user_estimate: float, retry_number: int, actual_memory: float): + def handle_underprediction(self, input_size: float, predicted: float, user_estimate: float, retry_number: int, actual_memory: float): + """ + Handles underprediction by doubling the predicted value. + + :param input_size: Size of the input data (unused in this method). + :param predicted: Predicted value. + :param user_estimate: User-provided estimate (unused in this method). + :param retry_number: Retry attempt number (unused in this method). + :param actual_memory: Actual memory usage (unused in this method). + :return: The predicted value multiplied by 2. + """ return predicted * 2 def update_model(self, X_train: pd.Series, y_train: float) -> None: + """ + Updates the model with new training data. + + :param X_train: New training feature data. + :param y_train: New training target data. + """ self.X_train_full = np.concatenate((self.X_train_full, [X_train])) self.y_train_full = np.concatenate((self.y_train_full, pd.Series(np.array([y_train])))) def get_number_subModels(self) -> dict[str, int]: + """ + Returns the number of sub-models used in the predictor. + + :return: An empty dictionary indicating no sub-models are used. + """ return {} class WittRegressionPredictor(PredictionMethod): + """ + A prediction method that uses linear regression to predict resource usage. + + Attributes: + pred_err_wittLR (list): List of prediction errors for tracking. + regressor (LinearRegression): Linear regression model for predictions. + train_X_scaler (MinMaxScaler): Scaler for training feature data. + train_y_scaler (MinMaxScaler): Scaler for training target data. + y_train_full (np.ndarray): Full training target values. + X_train_full (np.ndarray): Full training feature values. + offset_strategy (OFFSET_STRATEGY): Strategy for handling prediction offsets. + """ + pred_err_wittLR = [] def __init__(self, offset_strategy: OFFSET_STRATEGY): + """ + Initializes the WittRegressionPredictor object. + + :param offset_strategy: Strategy for handling prediction offsets. + """ self.regressor = None self.train_X_scaler = None self.train_y_scaler = None @@ -46,6 +111,12 @@ def __init__(self, offset_strategy: OFFSET_STRATEGY): self.offset_strategy = offset_strategy def initial_model_training(self, X_train, y_train) -> None: + """ + Trains the initial regression model using scaled training data. + + :param X_train: Training feature data. + :param y_train: Training target data. + """ self.train_X_scaler = MinMaxScaler() self.train_y_scaler = MinMaxScaler() @@ -61,6 +132,14 @@ def initial_model_training(self, X_train, y_train) -> None: self.regressor = LinearRegression().fit(X_train_scaled, y_train_scaled) def predict(self, X_test, y_test, user_estimate): + """ + Predicts resource usage using the trained regression model. + + :param X_test: Test feature data. + :param y_test: Test target data. + :param user_estimate: User-provided estimate (unused in this method). + :return: A tuple containing the adjusted prediction and the raw prediction. + """ task_features_scaled = self.train_X_scaler.transform(X_test.values.reshape(-1, 1)) prediction = self.train_y_scaler.inverse_transform(self.regressor.predict(task_features_scaled).reshape(-1, 1)) offset = self._get_offset(self.offset_strategy, self.pred_err_wittLR) @@ -68,6 +147,12 @@ def predict(self, X_test, y_test, user_estimate): return prediction + offset * prediction, prediction def update_model(self, X_train: pd.Series, y_train: float): + """ + Updates the regression model with new training data. + + :param X_train: New training feature data. + :param y_train: New training target data. + """ self.X_train_full = np.concatenate((self.X_train_full, [X_train])) self.y_train_full = np.concatenate((self.y_train_full, np.array([y_train]).reshape(-1, 1))) @@ -79,14 +164,30 @@ def update_model(self, X_train: pd.Series, y_train: float): self.regressor.fit(self.train_X_scaler.transform(self.X_train_full), self.train_y_scaler.transform(self.y_train_full)) - def handle_underprediction(self, input_size:float, predicted: float, user_estimate: float, retry_number: int, actual_memory: float): + def handle_underprediction(self, input_size: float, predicted: float, user_estimate: float, retry_number: int, actual_memory: float): + """ + Handles underprediction by adjusting the predicted value. + + :param input_size: Size of the input data (unused in this method). + :param predicted: Predicted value. + :param user_estimate: User-provided estimate (unused in this method). + :param retry_number: Retry attempt number (unused in this method). + :param actual_memory: Actual memory usage (unused in this method). + :return: Adjusted predicted value. + """ if predicted < 0: return max(self.y_train_full)[0] else: return predicted * 2 def _get_offset(self, offset_strategy: OFFSET_STRATEGY, prediction_error: list): + """ + Computes the offset for predictions based on the specified strategy. + :param offset_strategy: Strategy for handling prediction offsets. + :param prediction_error: List of prediction errors. + :return: Computed offset value. + """ if offset_strategy == OFFSET_STRATEGY.STD: if len(prediction_error) > 0: return np.std(prediction_error) @@ -106,10 +207,21 @@ def _get_offset(self, offset_strategy: OFFSET_STRATEGY, prediction_error: list): raise NotImplementedError('Something did not work here.') def _check_gt0(self, prediction) -> bool: + """ + Checks if a prediction error is greater than zero. + + :param prediction: Prediction error value. + :return: True if the prediction error is greater than zero, False otherwise. + """ if prediction > 0: return True return False def get_number_subModels(self) -> dict[str, int]: + """ + Returns the number of sub-models used in the predictor. + + :return: An empty dictionary indicating no sub-models are used. + """ return {} diff --git a/baselines/witt_low_wastage.py b/baselines/witt_low_wastage.py index 6b16fe3..47956fd 100644 --- a/baselines/witt_low_wastage.py +++ b/baselines/witt_low_wastage.py @@ -9,6 +9,7 @@ from typing import Callable, List, Optional import pandas as pd import numpy as np +import logging import scipy.optimize as spo import scipy.stats as sps import statsmodels.formula.api as smf @@ -23,12 +24,42 @@ class LowWastageRegression: - """ If the multiplier for failed attempts is optimized, limit it to this value, e.g., increase allocation by at least 50% upon each failure.""" + """ + A class to compute wastage-minimizing first allocations for jobs given historical data about their resource usage. + + This class normalizes training data, trains multiple models, and predicts resource allocations + using an ensemble of trained models. + + Attributes: + min_base (float): Minimum multiplier for failed attempts, e.g., increase allocation by at least 50% upon each failure. + min_allocation (float): Minimum allocation for resources. + relative_time_to_failure (float): Relative time to failure for resource allocation. + predictor_column (str): Column name for predictor variable in the training data. + resource_column (str): Column name for resource usage variable in the training data. + run_time_column (str): Column name for runtime variable in the training data. + prediction_column (str): Column name for predicted allocations. + shift (dict): Dictionary storing the shift values for normalization. + scale (dict): Dictionary storing the scale values for normalization. + initial_ptp (dict): Dictionary storing the peak-to-peak values for normalization. + unscaled_mean_predictor (float): Mean value of the predictor column before scaling. + data (pd.DataFrame): Normalized training data. + models (list): List of trained models. + """ + min_base = 1.5 def __init__(self, training_data: pd.DataFrame, predictor_column: str, resource_column: str, run_time_column: str, relative_time_to_failure: float, min_allocation: float): - + """ + Initializes the LowWastageRegression object and trains multiple models. + + :param training_data: DataFrame containing historical resource usage data. + :param predictor_column: Column name for predictor variable in the training data. + :param resource_column: Column name for resource usage variable in the training data. + :param run_time_column: Column name for runtime variable in the training data. + :param relative_time_to_failure: Relative time to failure for resource allocation. + :param min_allocation: Minimum allocation for resources. + """ self.min_allocation = min_allocation self.relative_time_to_failure = relative_time_to_failure @@ -60,9 +91,15 @@ def __init__(self, training_data: pd.DataFrame, predictor_column: str, resource_ self.training_data = self.data.sample(frac=0.7, random_state=i) self.models.append(self.__train__(optimize_base=False)) # print(self.models[1.0][0].slope) - print(self.models) + logging.debug(self.models) def predict(self, data: pd.DataFrame): + """ + Predicts resource allocations for the given data using the trained models. + + :param data: DataFrame containing data for prediction. + :return: Series containing predicted allocations. + """ df = data.copy() self.__transform__(df) @@ -76,24 +113,45 @@ def predict(self, data: pd.DataFrame): return df[self.prediction_column] def __transform__(self, data: pd.DataFrame): + """ + Normalizes the given data using the stored shift and scale values. + + :param data: DataFrame to be normalized. + """ for column in [self.predictor_column, self.resource_column]: data[column] = (data[column] - self.shift[column]) / self.scale[column] def __inverse_transform__(self, data: pd.DataFrame): + """ + Reverts the normalization of the given data. + + :param data: DataFrame to be reverted to original scale. + """ for column in [self.predictor_column, self.resource_column]: data[column] = data[column] * self.scale[column] + self.shift[column] data[self.prediction_column] = data[self.prediction_column] * self.scale[self.resource_column] + self.shift[ self.resource_column] def __predictor_varies_enough__(self): + """ + Checks if the predictor variable varies enough to justify training. + + :return: True if the predictor variable varies sufficiently, False otherwise. + """ return self.initial_ptp[self.predictor_column] > 0.05 * self.unscaled_mean_predictor def __quantile_regression__(self, steps: int = 5, max_iter=50): """ - Compute slopes and intercepts approximately (low number of iterations) corresponding to different quantile regression lines. - Uses a quadratic interpolation between 0.5 (median) and 0.9999 to obtain more candidates at the high end of the range. - """ + Compute slopes and intercepts approximately using quantile regression. + + This method calculates regression lines corresponding to different quantiles of the data. + It uses quadratic interpolation between 0.5 (median) and 0.9999 to obtain more candidates + at the high end of the range. + :param steps: Number of quantile steps to compute (default is 5). + :param max_iter: Maximum number of iterations for the quantile regression fitting (default is 50). + :return: A list of LinearModel objects representing the computed regression lines. + """ # e.g., [0.99, 0.91, 0.75, 0.51] for steps = 4 quantile_candidates = [1 - a ** 2 for a in np.linspace(0.01, 0.7, steps)] @@ -124,7 +182,15 @@ def __quantile_regression__(self, steps: int = 5, max_iter=50): return parameters_tried def __train__(self, optimize_base: bool): + """ + Train the model using either linear or quantile regression. + + This method decides whether to use linear regression or quantile regression based on + the variability of the predictor variable. + :param optimize_base: Whether to optimize the base parameter during training. + :return: The best model parameters and their associated wastage. + """ if not self.__predictor_varies_enough__(): # return self.__train_quantile__() NOT commenting this out and substituting it leads to a failure because methods returns NONE return self.__train_linear__(optimize_base=optimize_base) @@ -132,19 +198,24 @@ def __train__(self, optimize_base: bool): return self.__train_linear__(optimize_base=optimize_base) def __train_quantile__(self): + """ + Placeholder method for quantile regression training. + + This method is currently not implemented. + """ pass def __train_linear__(self, optimize_base: bool = False, max_iter_cobyla=200): """ - Use Constrained Optimization by Linear Approximation to find a good slope, intercept, and optionally, base - :param data: Needs the following columns 'rss' (peak memory usage), 'run_time', 'input_size' (zero allowed, but not NaN) - :param initial_solution: A function that returns initial model parameters from a training data set (e.g., initial_solution_zero_max or initial_solution_99percentile) - :param wastage_func: Computes the over- and under-sizing wastage for a given first allocation (set column 'first_allocation' on the data frame) - :param optimize_base: - :param exponential_base: Base for exponential failure handling strategy. If None given, the base is optimized for as well. (E.g., double allocation after each failure, add 50%, etc.) - :param min_allocation: Minimum memory to allocate to a task. Could be optimized as well. - :return: the best found model parameters, the according wastage, the wastage function (needed for evaluation set, and changes during optimization if base is not specified), the tried model parameters, and the resulting wastages - """ + Train the model using linear regression with optional base optimization. + + This method uses Constrained Optimization by Linear Approximation (COBYLA) to find + the best slope, intercept, and optionally, base for the linear model. + + :param optimize_base: Whether to optimize the base parameter during training (default is False). + :param max_iter_cobyla: Maximum number of iterations for the COBYLA optimizer (default is 200). + :return: The best model parameters and their associated wastage. + """ parameters_tried = [] wastages_tried = [] @@ -172,6 +243,12 @@ def __train_linear__(self, optimize_base: bool = False, max_iter_cobyla=200): base_constraints = ({'type': 'ineq', 'fun': lambda x: x[2] - self.min_base}) if optimize_base else () def wastage(model_params: [float]): + """ + Compute the wastage for a given set of model parameters. + + :param model_params: List containing slope, intercept, and optionally base. + :return: Total wastage (oversizing + undersizing). + """ params = self.__linear_model__(slope=model_params[0], intercept=model_params[1], base=model_params[2] if optimize_base else 2) @@ -207,21 +284,58 @@ def wastage(model_params: [float]): return best_parameters, lowest_wastage def __linear_model__(self, slope, intercept, base): + """ + Create a LinearModel object with the specified parameters. + + :param slope: Slope of the linear model. + :param intercept: Intercept of the linear model. + :param base: Base for exponential failure handling strategy. + :return: A LinearModel object. + """ return LinearModel(slope=slope, intercept=intercept, base=base, predictor_column=self.predictor_column, min_allocation=self.min_allocation) @property def model(self): + """ + Get the best model based on quality. + + :return: The LinearModel object with the highest quality. + """ return max(self.models, key=lambda m: m[1].maq)[0] @property def quality(self): + """ + Get the quality of the best model. + + :return: The quality object associated with the best model. + """ return max(self.models, key=lambda m: m[1].maq)[1] class LinearModel: + """ + Represents a linear model for resource allocation predictions. + + Attributes: + slope (float): The slope of the linear model. + intercept (float): The intercept of the linear model. + predictor_column (Optional[str]): The column name of the predictor variable in the data. + base (Optional[float]): The base value for exponential failure handling strategy. + min_allocation (Optional[float]): The minimum allocation value to ensure resources are not under-allocated. + """ def __init__(self, slope: float, intercept: float, predictor_column: Optional[str] = None, base: Optional[float] = None, min_allocation: Optional[float] = None): + """ + Initializes the LinearModel object with the specified parameters. + + :param slope: The slope of the linear model. + :param intercept: The intercept of the linear model. + :param predictor_column: The column name of the predictor variable in the data. + :param base: The base value for exponential failure handling strategy. + :param min_allocation: The minimum allocation value to ensure resources are not under-allocated. + """ self.slope = slope self.intercept = intercept self.min_allocation = min_allocation @@ -229,15 +343,38 @@ def __init__(self, slope: float, intercept: float, predictor_column: Optional[st self.predictor_column = predictor_column def apply(self, data: pd.DataFrame): + """ + Applies the linear model to the given data to compute resource allocations. + + :param data: A pandas DataFrame containing the predictor column. + :return: A numpy array of computed resource allocations, clipped to ensure a minimum allocation. + """ return np.clip(data[self.predictor_column] * self.slope + self.intercept, a_min=self.min_allocation, a_max=None) def __str__(self): + """ + Returns a string representation of the LinearModel object. + + :return: A formatted string describing the slope, intercept, base, and minimum allocation of the model. + """ return "slope {:.2f} intercept {:.2f} base {:.2f} minimum allocation {:.2f}".format(self.slope, self.intercept, self.base, self.min_allocation) -def main_witt_wastage(workflow: str, seed: int, error_metric: str, alpha: float, use_softmax: bool): +def main_witt_wastage(workflow: str, seed: int, error_metric: str, alpha: float, use_softmax: bool, use_online_grid: bool): + """ + Main function to compute wastage-minimizing first allocations for tasks in a workflow. + + This function processes historical resource usage data, trains a LowWastageRegression model, + evaluates its predictions, and writes the results to CSV files. + + :param workflow: Name of the workflow to process. + :param seed: Random seed for reproducibility. + :param error_metric: Metric used to evaluate prediction errors. + :param alpha: Alpha parameter for smoothing or weighting predictions. + :param use_softmax: Boolean flag indicating whether to use softmax in predictions. + """ import time @@ -266,13 +403,13 @@ def main_witt_wastage(workflow: str, seed: int, error_metric: str, alpha: float, if check_substring_in_csv(workflow, alpha, use_softmax_single, error_metric, "Witt-Ice", task, "Default", - "Default", seed): + "Default", seed, use_online_grid): continue REL_TTF = 1.0 BASE = 2 MIN_ALLOC = 0.01 - print("Task: " + task) + logging.debug("Task: " + task) df = df_all_tasks[df_all_tasks['process'] == task] if (len(df) < 34): @@ -288,10 +425,10 @@ def main_witt_wastage(workflow: str, seed: int, error_metric: str, alpha: float, lwr = LowWastageRegression(training, predictor_column='input_size', resource_column='peak_rss', run_time_column='realtime', relative_time_to_failure=1., min_allocation=0.01) - print("best_params: {0}".format(lwr.model)) - print("maq: {:.2f}% failures: {:.2f}%".format(lwr.quality.maq * 100, lwr.quality.failures / df.size * 100)) + logging.debug("best_params: {0}".format(lwr.model)) + logging.debug("maq: {:.2f}% failures: {:.2f}%".format(lwr.quality.maq * 100, lwr.quality.failures / df.size * 100)) time_needed = time.time() - before - print("\ntime needed: {0}".format(time_needed)) + logging.debug("\ntime needed: {0}".format(time_needed)) evaluation = df.drop(training.index) evaluation['first_allocation'] = lwr.predict(evaluation) @@ -299,7 +436,6 @@ def main_witt_wastage(workflow: str, seed: int, error_metric: str, alpha: float, w = Wastage.exponential(evaluation, 1.0, resource_column='peak_rss', first_allocation_column='first_allocation', run_time_column='realtime', workflow=workflow) - print(w) for index, row in evaluation.iterrows(): write_single_task_to_csv("Witt-Ice", "Default", "Default", workflow, "smoothed_mape", True, @@ -307,10 +443,8 @@ def main_witt_wastage(workflow: str, seed: int, error_metric: str, alpha: float, row["Wastage_GBh"], row["Predictions"], row["peak_rss"], row["first_allocation"], len(row["Predictions"])-1, alpha, row["realtime"], len(row["Predictions"])-1, seed) - write_result_to_csv("Witt-Ice", "Default", "Default", task, - w.wastage_GB * 1000000000, w.wastage_GB * 1000000, w.wastage_GB, w.wastage_GBh * 1000000, - w.wastage_GBh, - w.failures, w.runtime_h, "1", workflow, time_needed, "1.0", alpha, use_softmax_single, {}, error_metric, "-1", seed) + write_result_to_csv("Witt-Ice", "Default", "Default", task, w.wastage_GB, w.wastage_GBh, + w.failures, w.runtime_h, "1", workflow, time_needed, "1.0", alpha, use_softmax_single, {}, error_metric, "-1", seed, use_online_grid) #training.plot.scatter(x='input_size', y='rss') #import matplotlib.pyplot as plt diff --git a/experiments.sh b/experiments.sh new file mode 100644 index 0000000..d808b7f --- /dev/null +++ b/experiments.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Config +DATA_DIR="./data" +FILES=( + "trace_chipseq.csv" + "trace_eager.csv" + "trace_iwd.csv" + "trace_mag.csv" + "trace_methylseq.csv" + "trace_rnaseq.csv" +) +ALPHAS=("0.0" "0.25" "0.5" "0.75" "1.0") +SOFTMAX=("True" "False") +SEEDS=("1001" "1111" "1234" "1996" "2024") # five fixed 4-digit seeds +ERROR_METRIC="smoothed_mape" + +LOG_DIR="sweep_logs" +mkdir -p "$LOG_DIR" + +run_one () { + local file="$1" + local alpha="$2" + local softmax="$3" + local seed="$4" + local use_grid_flag="$5" # "" or "--use_online_grid" + + # Create unique log filename + local base_name + base_name=$(basename "$file" .csv) + local grid_tag + if [ -z "$use_grid_flag" ]; then + grid_tag="nogrid" + else + grid_tag="grid" + fi + local log_file="${LOG_DIR}/${base_name}_a${alpha}_s${softmax}_seed${seed}_${grid_tag}.log" + + echo ">>> Running: ${file}, alpha=${alpha}, softmax=${softmax}, seed=${seed}, ${grid_tag}" + /usr/bin/time -f "Run time: %E (elapsed), %U (user), %S (sys)" \ + python main.py "${DATA_DIR}/${file}" "${alpha}" "${softmax}" "${ERROR_METRIC}" "${seed}" ${use_grid_flag} \ + >"$log_file" 2>&1 +} + +# Sweep +for file in "${FILES[@]}"; do + for alpha in "${ALPHAS[@]}"; do + for soft in "${SOFTMAX[@]}"; do + for seed in "${SEEDS[@]}"; do + run_one "$file" "$alpha" "$soft" "$seed" "" + run_one "$file" "$alpha" "$soft" "$seed" "--use_online_grid" + done + done + done +done + +echo "All runs completed. Logs saved in $LOG_DIR/" diff --git a/helper/get_tasks.py b/helper/get_tasks.py index 9bfda80..3432cb4 100644 --- a/helper/get_tasks.py +++ b/helper/get_tasks.py @@ -5,6 +5,22 @@ def getTasks(): + """ + Reads task data from multiple CSV files in the `./data/eager/` directory, processes the data, + and returns a DataFrame containing aggregated task information. + + The function filters out rows with invalid `io_read_bytes` and `io_write_bytes` values, + computes statistics such as median input size, maximum memory usage, and runtime, + and organizes the data into a structured format. + + :return: A pandas DataFrame with columns: + - Name (str): Task name extracted from the filename. + - Split (int): Number of splits in the filename. + - input_size (float): Median of `io_read_bytes` values. + - output_size (int): Fixed value of 1 (placeholder). + - rss (float): Maximum memory usage in MB. + - run_time (float): Runtime in seconds, calculated from the timestamp range. + """ files = [f for f in glob.glob("./data/eager/task_*", recursive=False)] data = [] @@ -22,6 +38,13 @@ def getTasks(): def getTasksFromCSV(filename: str): + """ + Reads task data from a single CSV file, renames the `rchar` column to `input_size`, + and returns the processed DataFrame. + + :param filename: Path to the CSV file to be read. + :return: A pandas DataFrame with the renamed column `input_size`. + """ csv = pd.read_csv(filename) csv = csv.rename(columns={"rchar": "input_size"}) return csv \ No newline at end of file diff --git a/main.py b/main.py index 3849141..ad37fd3 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,8 @@ import sys import time import warnings +import argparse +import os import numpy as np import pandas as pd @@ -11,7 +13,7 @@ from approach.abstract_predictor import PredictionMethod from approach.experiment_constants import ERROR_STRATEGY, OFFSET_STRATEGY -from approach.helper import byte_and_time_to_mbh, ms_to_h, byte_and_time_to_gbh, byte_to_gigabyte, byte_to_mb, \ +from approach.helper import ms_to_h, byte_and_time_to_gbh, byte_to_gigabyte, byte_to_mb, \ write_result_to_csv, check_substring_in_csv, write_single_task_to_csv from approach.sizing_tasks import Sizey from baselines.tovar import TovarPredictor @@ -23,10 +25,27 @@ warnings.filterwarnings(action='ignore', category=UserWarning) warnings.filterwarnings(action='ignore', category=ConvergenceWarning) -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" +) + + +def parse_bool(value: str) -> bool: + """Parse boolean values for CLI arguments.""" + true_set = {"true", "1", "yes", "y"} + false_set = {"false", "0", "no", "n"} + lower = value.lower() + if lower in true_set: + return True + if lower in false_set: + return False + raise argparse.ArgumentTypeError("Expected a boolean value for softmax") def is_numeric(value): + """Check if a value is numeric.""" try: float(value) return True @@ -38,18 +57,15 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra prediction_method: PredictionMethod, X_test_inner, y_test_inner, additionalTime, user_estimate_mem, workflow: str, alpha: float, use_softmax: bool, - error_metric: str, seed: int): + error_metric: str, seed: int, use_online_grid: bool): + """ + Run the online prediction method and calculate wastage metrics. + """ usage = 0 failures = 0 - wastage_in_bytes_over = 0 - wastage_in_mb_over = 0 wastage_in_gb_over = 0 - wastage_mbH_over = 0 wastage_gbh_over = 0 - wastage_in_bytes_under = 0 - wastage_in_mb_under = 0 wastage_in_gb_under = 0 - wastage_mbH_under = 0 wastage_gbh_under = 0 time_start = time.time() sumRuntimeTasks = 0 @@ -58,7 +74,7 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra # Check if entry already exists. Helpful in case an execution failed if check_substring_in_csv(workflow, alpha, use_softmax, error_metric, method_name, taskname, offset_strat, - error_strat, seed): + error_strat, seed, use_online_grid): return # Online Learning @@ -88,8 +104,7 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra memory_prediction_from_method = memory_prediction_from_method[0][0] raw_memory_prediction_from_method = raw_memory_prediction_from_method[0][0] - print(method_name) - print("Prediction: " + str(memory_prediction_from_method)) + logging.debug("Prediction: " + str(memory_prediction_from_method)) predictions.append(memory_prediction_from_method) raw_prediction = raw_memory_prediction_from_method @@ -97,10 +112,7 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra actualList.append(entry) if memory_prediction_from_method >= entry: - wastage_in_bytes_over = wastage_in_bytes_over + (memory_prediction_from_method - entry) - wastage_in_mb_over = wastage_in_mb_over + byte_to_mb(memory_prediction_from_method - entry) wastage_in_gb_over = wastage_in_gb_over + byte_to_gigabyte(memory_prediction_from_method - entry) - wastage_mbH_over = wastage_mbH_over + byte_and_time_to_mbh(memory_prediction_from_method - entry, runtime) wastage_gbh_over = wastage_gbh_over + byte_and_time_to_gbh(memory_prediction_from_method - entry, runtime) task_iteration_gbh = task_iteration_gbh + byte_and_time_to_gbh(memory_prediction_from_method - entry, runtime) @@ -108,10 +120,7 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra logging.debug("Wasted " + str((memory_prediction_from_method - entry)) + " bytes") elif memory_prediction_from_method < entry: logging.debug("Handle underprediction") - wastage_in_bytes_under = wastage_in_bytes_under + (memory_prediction_from_method) - wastage_in_mb_under = wastage_in_mb_under + byte_to_mb(memory_prediction_from_method) wastage_in_gb_under = wastage_in_gb_under + byte_to_gigabyte(memory_prediction_from_method) - wastage_mbH_under = wastage_mbH_under + byte_and_time_to_mbh(memory_prediction_from_method, runtime) wastage_gbh_under = wastage_gbh_under + byte_and_time_to_gbh(memory_prediction_from_method, runtime) task_iteration_gbh = task_iteration_gbh + byte_and_time_to_gbh(memory_prediction_from_method, runtime) @@ -127,11 +136,7 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra predictions.append(memory_prediction_from_method) raw_prediction = raw_memory_prediction_from_method if memory_prediction_from_method < entry: - wastage_in_bytes_under = wastage_in_bytes_under + (memory_prediction_from_method) - wastage_in_mb_under = wastage_in_mb_under + byte_to_mb(memory_prediction_from_method) wastage_in_gb_under = wastage_in_gb_under + byte_to_gigabyte(memory_prediction_from_method) - wastage_mbH_under = wastage_mbH_under + byte_and_time_to_mbh(memory_prediction_from_method, - runtime) wastage_gbh_under = wastage_gbh_under + byte_and_time_to_gbh(memory_prediction_from_method, runtime) task_iteration_gbh = task_iteration_gbh + byte_and_time_to_gbh(memory_prediction_from_method, @@ -139,11 +144,7 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra sumRuntimeTasks = sumRuntimeTasks + ms_to_h(runtime) continue else: - wastage_in_bytes_under = wastage_in_bytes_under + (memory_prediction_from_method - entry) - wastage_in_mb_under = wastage_in_mb_under + byte_to_mb(memory_prediction_from_method - entry) wastage_in_gb_under = wastage_in_gb_under + byte_to_gigabyte(memory_prediction_from_method - entry) - wastage_mbH_under = wastage_mbH_under + byte_and_time_to_mbh(memory_prediction_from_method - entry, - runtime) wastage_gbh_under = wastage_gbh_under + byte_and_time_to_gbh(memory_prediction_from_method - entry, runtime) task_iteration_gbh = task_iteration_gbh + byte_and_time_to_gbh( @@ -158,7 +159,7 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra else: prediction_method.update_model(entry, runtime) - print("Method: " + method_name + " predicted " + str( + logging.debug("Method: " + method_name + " predicted " + str( predictions) + " with an actual memory consumption of " + str(entry) + " and a runtime of " + str(runtime)) write_single_task_to_csv(method_name, error_strat, offset_strat, workflow, error_metric, use_softmax, taskname, task_iteration_gbh, predictions, entry, raw_prediction, @@ -166,138 +167,159 @@ def run_online_and_calculate_wastage(method_name: str, taskname: str, error_stra time_end = time.time() - maq = usage / (usage + wastage_gbh_over + wastage_in_gb_under) + maq = usage / (usage + wastage_gbh_over + wastage_gbh_under) + logging.debug(f"Error Metric: {error_metric}, Accuracy: {maq * 100}, Seed: {seed}") write_result_to_csv(method_name, error_strat, offset_strat, taskname, - wastage_in_bytes_under + wastage_in_bytes_over, wastage_in_mb_under + wastage_in_mb_over, - wastage_in_gb_under + wastage_in_gb_over, wastage_mbH_under + wastage_mbH_over, - wastage_gbh_under + wastage_gbh_over, failures, sumRuntimeTasks, len(y_test_inner), workflow, - time_end - time_start, maq * 100, alpha, use_softmax, prediction_method.get_number_subModels(), - error_metric, str(mean_absolute_percentage_error(actualList, predictionList)), seed) + wastage_in_gb_under + wastage_in_gb_over, wastage_gbh_under + wastage_gbh_over, failures, + sumRuntimeTasks, len(y_test_inner), workflow, time_end - time_start, maq * 100, alpha, + use_softmax, prediction_method.get_number_subModels(), error_metric, + str(mean_absolute_percentage_error(actualList, predictionList)), seed, use_online_grid) return wastage_in_gb_under + wastage_in_gb_over -df2 = getTasksFromCSV(sys.argv[1]) -unique_tasks = df2['process'].unique() -sizey_alpha = float(sys.argv[2]) -use_softmax = sys.argv[3] in "True" -seed = int(sys.argv[5]) - -error_metric = sys.argv[4] - -print(use_softmax) -if (sizey_alpha > 1.0) | (sizey_alpha < 0.0): - sys.exit() - -wf_name = sys.argv[1].split("_")[1].split('.')[0] - -for task in unique_tasks: - - new_dataF = df2[df2['process'] == task].copy() - new_dataF['rss'] = pd.to_numeric(new_dataF['rss'], errors='coerce') - new_dataF['input_size'] = pd.to_numeric(new_dataF['input_size'], errors='coerce') - new_dataF['memory'] = pd.to_numeric(new_dataF['memory'], errors='coerce') - new_dataF['peak_rss'] = pd.to_numeric(new_dataF['peak_rss'], errors='coerce') - new_dataF = new_dataF[new_dataF['rss'] > 0] # Filter out failed measurements - - # Remove task with fewer task instances, can be adjusted to filter out more tasks. - if (len(new_dataF) < 34): - continue - - # Measured runtime values of 0 indicate that a task instance has run too short to measure its resource usage. Therefore, the instance is removed. - if (new_dataF['realtime'] == 0).any(): - continue - - x2 = new_dataF['input_size'].to_frame() - y2 = new_dataF['rss'] - user_estimates = new_dataF['memory'] - - runtimes = new_dataF['realtime'] - - # The test size can be adjusted in order to define the historical data available. - X_train, X_test, y_train, y_test, runtime_train, runtime_test, user_estimates_train, user_estimates_test = train_test_split( - x2, y2, runtimes, user_estimates, - test_size=0.7, random_state=seed) - - witt_percentile_predictor = WittPercentilePredictor() - witt_percentile_predictor.initial_model_training(X_train, y_train) - - witt_lr_predictor_std = WittRegressionPredictor(OFFSET_STRATEGY.STD) - witt_lr_predictor_std.initial_model_training(X_train, y_train) - - witt_lr_predictor_stdunder = WittRegressionPredictor(OFFSET_STRATEGY.STDUNDER) - witt_lr_predictor_stdunder.initial_model_training(X_train, y_train) - - tovar_predictor = TovarPredictor() - tovar_predictor.initial_model_training(y_train, runtime_train) - - - run_online_and_calculate_wastage("Witt-LR", task, 'Default', OFFSET_STRATEGY.STD.name, witt_lr_predictor_std, - X_test, y_test, - runtime_test, user_estimates_test, wf_name, - sizey_alpha, - use_softmax, error_metric, seed) - - run_online_and_calculate_wastage("Tovar", task, 'Default', 'Default', tovar_predictor, - X_test, y_test, runtime_test, user_estimates_test, - wf_name, sizey_alpha, use_softmax, - error_metric, seed) - - run_online_and_calculate_wastage("Witt-Percentile", task, 'Default', 'Default', witt_percentile_predictor, - X_test, y_test, runtime_test, user_estimates_test, - wf_name, - sizey_alpha, use_softmax, error_metric, seed) - - run_online_and_calculate_wastage("Witt-LR", task, 'Default', OFFSET_STRATEGY.STDUNDER.name, - witt_lr_predictor_stdunder, X_test, y_test, - runtime_test, user_estimates_test, wf_name, - sizey_alpha, - use_softmax, error_metric, seed) - - filtered_original_data_for_default_comparison = new_dataF[new_dataF.index.isin(y_test.index)] - - if not check_substring_in_csv(wf_name, sizey_alpha, use_softmax, error_metric, - "Workflow-Presets", task, "Default", "Default", seed): - write_result_to_csv("Workflow-Presets", "Default", "Default", task, - (filtered_original_data_for_default_comparison["memory"] - - filtered_original_data_for_default_comparison["peak_rss"]).sum(), - str(byte_to_mb((filtered_original_data_for_default_comparison["memory"] - - filtered_original_data_for_default_comparison["peak_rss"]).sum())), - str(byte_to_gigabyte((filtered_original_data_for_default_comparison["memory"] - - filtered_original_data_for_default_comparison[ - "peak_rss"]).sum())), - str(((filtered_original_data_for_default_comparison["memory"] - - filtered_original_data_for_default_comparison["peak_rss"]) * 0.000001 * - filtered_original_data_for_default_comparison[ - "realtime"] / 3600000.0).sum()), - str(((filtered_original_data_for_default_comparison["memory"] - - filtered_original_data_for_default_comparison["peak_rss"]) * 0.000000001 * - filtered_original_data_for_default_comparison[ - "realtime"] / 3600000.0).sum()), - 0, - filtered_original_data_for_default_comparison["realtime"].sum() / 3600000.0, - len(filtered_original_data_for_default_comparison), - wf_name, 0, - (filtered_original_data_for_default_comparison["realtime"] / 3600000.0 * - filtered_original_data_for_default_comparison["peak_rss"] * 0.000000001).sum() / - ((filtered_original_data_for_default_comparison["realtime"] / 3600000.0 * - filtered_original_data_for_default_comparison["peak_rss"] * 0.000000001).sum() + - ((filtered_original_data_for_default_comparison["memory"] - - filtered_original_data_for_default_comparison["peak_rss"]) * 0.000000001 * - filtered_original_data_for_default_comparison["realtime"] / 3600000.0).sum() - ) * 100, sizey_alpha, use_softmax, {}, error_metric, "-1", seed) - - # You can configure multiple/all Sizey configurations. Currently, it uses the paper default - for error_strat in ERROR_STRATEGY: - for offset_strat in OFFSET_STRATEGY: - if (offset_strat.name == "DYNAMIC") & (error_strat.name == "MAX_EVER_OBSERVED"): - sizey = Sizey(X_train, y_train.values.reshape(-1, 1), sizey_alpha, offset_strat, 0.05, - error_strat, use_softmax, error_metric) - run_online_and_calculate_wastage("Sizey", task, error_strat.name, offset_strat.name, sizey, X_test, - y_test, runtime_test, user_estimates_test, - wf_name, - sizey_alpha, use_softmax, error_metric, seed) - - - -main_witt_wastage(wf_name, seed, error_metric, sizey_alpha, use_softmax) +def main(filename: str, alpha: float, softmax: bool, error_metric: str, seed: int, + batch_size: int = 1, retrain_interval: int | None = None, use_online_grid: bool = False) -> None: + """Run the complete Sizey evaluation pipeline. + + :param filename: Path to the workflow CSV. + :param alpha: Value for the RAQ weight. + :param softmax: Whether to use the softmax ensemble. + :param error_metric: Error metric to train the models. + :param seed: Random seed for the data split. + :param batch_size: Number of samples used for each mini-batch update. + :param retrain_interval: After how many mini-batch updates a full model + re-training should be executed. ``None`` or ``0`` disables the + periodic refresh. + :param use_online_grid: Whether to use the online grid search for + hyperparameter tuning. + """ + df2 = getTasksFromCSV(filename) + unique_tasks = df2['process'].unique() + + sizey_alpha = alpha + use_softmax = softmax + wf_name = filename.split("_")[1].split('.')[0] + + for task in unique_tasks: + + new_dataF = df2[df2['process'] == task].copy() + new_dataF['rss'] = pd.to_numeric(new_dataF['rss'], errors='coerce') + new_dataF['input_size'] = pd.to_numeric(new_dataF['input_size'], errors='coerce') + new_dataF['memory'] = pd.to_numeric(new_dataF['memory'], errors='coerce') + new_dataF['peak_rss'] = pd.to_numeric(new_dataF['peak_rss'], errors='coerce') + new_dataF = new_dataF[new_dataF['rss'] > 0] # Filter out failed measurements + + # Remove task with fewer task instances, can be adjusted to filter out more tasks. + if (len(new_dataF) < 34): + continue + + # Measured runtime values of 0 indicate that a task instance has run too short to measure its resource usage. Therefore, the instance is removed. + if (new_dataF['realtime'] == 0).any(): + continue + + x2 = new_dataF['input_size'].to_frame() + y2 = new_dataF['rss'] + user_estimates = new_dataF['memory'] + + runtimes = new_dataF['realtime'] + + # The test size can be adjusted in order to define the historical data available. + X_train, X_test, y_train, y_test, runtime_train, runtime_test, user_estimates_train, user_estimates_test = train_test_split( + x2, y2, runtimes, user_estimates, + test_size=0.7, random_state=seed) + + witt_percentile_predictor = WittPercentilePredictor() + witt_percentile_predictor.initial_model_training(X_train, y_train) + + witt_lr_predictor_std = WittRegressionPredictor(OFFSET_STRATEGY.STD) + witt_lr_predictor_std.initial_model_training(X_train, y_train) + + witt_lr_predictor_stdunder = WittRegressionPredictor(OFFSET_STRATEGY.STDUNDER) + witt_lr_predictor_stdunder.initial_model_training(X_train, y_train) + + tovar_predictor = TovarPredictor() + tovar_predictor.initial_model_training(y_train, runtime_train) + + run_online_and_calculate_wastage("Witt-LR", task, 'Default', OFFSET_STRATEGY.STD.name, witt_lr_predictor_std, + X_test, y_test, + runtime_test, user_estimates_test, wf_name, + sizey_alpha, + use_softmax, error_metric, seed, use_online_grid) + + run_online_and_calculate_wastage("Tovar", task, 'Default', 'Default', tovar_predictor, + X_test, y_test, runtime_test, user_estimates_test, + wf_name, sizey_alpha, use_softmax, + error_metric, seed, use_online_grid) + + run_online_and_calculate_wastage("Witt-Percentile", task, 'Default', 'Default', witt_percentile_predictor, + X_test, y_test, runtime_test, user_estimates_test, + wf_name, + sizey_alpha, use_softmax, error_metric, seed, use_online_grid) + + run_online_and_calculate_wastage("Witt-LR", task, 'Default', OFFSET_STRATEGY.STDUNDER.name, + witt_lr_predictor_stdunder, X_test, y_test, + runtime_test, user_estimates_test, wf_name, + sizey_alpha, + use_softmax, error_metric, seed, use_online_grid) + + filtered_original_data_for_default_comparison = new_dataF[new_dataF.index.isin(y_test.index)] + + if not check_substring_in_csv(wf_name, sizey_alpha, use_softmax, error_metric, + "Workflow-Presets", task, "Default", "Default", seed, use_online_grid): + write_result_to_csv("Workflow-Presets", "Default", "Default", task, + str(byte_to_gigabyte((filtered_original_data_for_default_comparison["memory"] - + filtered_original_data_for_default_comparison[ + "peak_rss"]).sum())), + str(((filtered_original_data_for_default_comparison["memory"] - + filtered_original_data_for_default_comparison["peak_rss"]) * 0.000000001 * + filtered_original_data_for_default_comparison[ + "realtime"] / 3600000.0).sum()), + 0, + filtered_original_data_for_default_comparison["realtime"].sum() / 3600000.0, + len(filtered_original_data_for_default_comparison), + wf_name, 0, + (filtered_original_data_for_default_comparison["realtime"] / 3600000.0 * + filtered_original_data_for_default_comparison["peak_rss"] * 0.000000001).sum() / + ((filtered_original_data_for_default_comparison["realtime"] / 3600000.0 * + filtered_original_data_for_default_comparison["peak_rss"] * 0.000000001).sum() + + ((filtered_original_data_for_default_comparison["memory"] - + filtered_original_data_for_default_comparison["peak_rss"]) * 0.000000001 * + filtered_original_data_for_default_comparison["realtime"] / 3600000.0).sum() + ) * 100, sizey_alpha, use_softmax, {}, error_metric, "-1", seed, use_online_grid) + + # You can configure multiple/all Sizey configurations. Currently, it uses the paper default + for error_strat in ERROR_STRATEGY: + for offset_strat in OFFSET_STRATEGY: + if (offset_strat.name == "DYNAMIC") & (error_strat.name == "MAX_EVER_OBSERVED"): + sizey = Sizey(X_train, y_train, sizey_alpha, offset_strat, 0.05, + error_strat, use_softmax, error_metric, batch_size, + retrain_interval, use_online_grid) + run_online_and_calculate_wastage("Sizey", task, error_strat.name, offset_strat.name, sizey, X_test, + y_test, runtime_test, user_estimates_test, + wf_name, + sizey_alpha, use_softmax, error_metric, seed, use_online_grid) + + main_witt_wastage(wf_name, seed, error_metric, sizey_alpha, use_softmax, use_online_grid) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run Sizey memory predictions") + parser.add_argument("filename", help="CSV workflow file, e.g. ./data/trace_methylseq.csv") + parser.add_argument("alpha", type=float, help="Alpha value between 0.0 and 1.0") + parser.add_argument("softmax", type=parse_bool, help="Enable softmax ensemble (True/False)") + parser.add_argument("error_metric", choices=["smoothed_mape", "neg_mean_squared_error"], + help="Error metric for model training") + parser.add_argument("seed", type=int, help="Random seed for train/test split") + parser.add_argument("--use_online_grid", action="store_true", + help="Use online grid search for hyperparameter tuning") + args = parser.parse_args() + + if not os.path.isfile(args.filename): + parser.error(f"File '{args.filename}' does not exist") + if not 0.0 <= args.alpha <= 1.0: + parser.error("alpha must be between 0.0 and 1.0") + + main(filename=args.filename, alpha=args.alpha, softmax=args.softmax, + error_metric=args.error_metric, seed=args.seed, use_online_grid=args.use_online_grid) +