Skip to content

Commit

Permalink
Time consuming report after composition (#1257)
Browse files Browse the repository at this point in the history
Added a report on time consuming in API. The following steps are presented in this report:
- 'Data Definition (fit)': Time spent on data definition in fit().
- 'Data Preprocessing': Total time spent on preprocessing data, includes fitting and predicting stages.
- 'Fitting (summary)': Total time spent on Composing, Tuning and Training Inference.
- 'Composing': Time spent on searching for the best pipeline.
- 'Train Inference': Time spent on training the pipeline found during composing.
- 'Tuning (composing)': Time spent on hyperparameters tuning in the whole fitting, if with_tune is True.
- 'Tuning (after)': Time spent on .tune() (hyperparameters tuning) after composing.
- 'Data Definition (predict)': Time spent on data definition in predict().
- 'Predicting': Time spent on predicting (inference).
  • Loading branch information
aPovidlo authored Feb 26, 2024
1 parent c53881a commit c17381c
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 67 deletions.
38 changes: 22 additions & 16 deletions fedot/api/api_utils/api_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
from fedot.core.repository.metrics_repository import MetricIDType
from fedot.utilities.composer_timer import fedot_composer_timer


class ApiComposer:
Expand Down Expand Up @@ -52,28 +53,33 @@ def init_cache(self):

def obtain_model(self, train_data: InputData) -> Tuple[Pipeline, Sequence[Pipeline], OptHistory]:
""" Function for composing FEDOT pipeline model """
timeout: float = self.params.timeout
with_tuning = self.params.get('with_tuning')

self.timer = ApiTime(time_for_automl=timeout, with_tuning=with_tuning)
with fedot_composer_timer.launch_composing():
timeout: float = self.params.timeout
with_tuning = self.params.get('with_tuning')

initial_assumption, fitted_assumption = self.propose_and_fit_initial_assumption(train_data)
self.timer = ApiTime(time_for_automl=timeout, with_tuning=with_tuning)

multi_objective = len(self.metrics) > 1
self.params.init_params_for_composing(self.timer.timedelta_composing, multi_objective)
initial_assumption, fitted_assumption = self.propose_and_fit_initial_assumption(train_data)

self.log.message(f"AutoML configured."
f" Parameters tuning: {with_tuning}."
f" Time limit: {timeout} min."
f" Set of candidate models: {self.params.get('available_operations')}.")
multi_objective = len(self.metrics) > 1
self.params.init_params_for_composing(self.timer.timedelta_composing, multi_objective)

self.log.message(f"AutoML configured."
f" Parameters tuning: {with_tuning}."
f" Time limit: {timeout} min."
f" Set of candidate models: {self.params.get('available_operations')}.")

best_pipeline, best_pipeline_candidates, gp_composer = self.compose_pipeline(
train_data,
initial_assumption,
fitted_assumption
)

best_pipeline, best_pipeline_candidates, gp_composer = self.compose_pipeline(
train_data,
initial_assumption,
fitted_assumption
)
if with_tuning:
best_pipeline = self.tune_final_pipeline(train_data, best_pipeline)
with fedot_composer_timer.launch_tuning('composing'):
best_pipeline = self.tune_final_pipeline(train_data, best_pipeline)

if gp_composer.history:
adapter = self.params.graph_generation_params.adapter
gp_composer.history.tuning_result = adapter.adapt(best_pipeline)
Expand Down
135 changes: 86 additions & 49 deletions fedot/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from fedot.remote.remote_evaluator import RemoteEvaluator
from fedot.utilities.define_metric_by_task import MetricByTask
from fedot.utilities.memory import MemoryAnalytics
from fedot.utilities.composer_timer import fedot_composer_timer
from fedot.utilities.project_import_export import export_project_to_zip, import_project_from_zip

NOT_FITTED_ERR_MSG = 'Model not fitted yet'
Expand Down Expand Up @@ -118,6 +119,8 @@ def __init__(self,
self.best_models: Sequence[Pipeline] = ()
self.history: Optional[OptHistory] = None

fedot_composer_timer.reset_timer()

def fit(self,
features: FeaturesType,
target: TargetType = 'target',
Expand All @@ -140,7 +143,9 @@ def fit(self,

self.target = target

self.train_data = self.data_processor.define_data(features=features, target=target, is_predict=False)
with fedot_composer_timer.launch_data_definition('fit'):
self.train_data = self.data_processor.define_data(features=features, target=target, is_predict=False)

self.params.update_available_operations_by_preset(self.train_data)

if self.params.get('use_input_preprocessing'):
Expand All @@ -158,26 +163,30 @@ def fit(self,
self._init_remote_if_necessary()

if isinstance(self.train_data, InputData) and self.params.get('use_auto_preprocessing'):
self.train_data = self.data_processor.fit_transform(self.train_data)
with fedot_composer_timer.launch_preprocessing():
self.train_data = self.data_processor.fit_transform(self.train_data)

with fedot_composer_timer.launch_fitting():
if predefined_model is not None:
# Fit predefined model and return it without composing
self.current_pipeline = PredefinedModel(predefined_model, self.train_data, self.log,
use_input_preprocessing=self.params.get(
'use_input_preprocessing')).fit()
else:
self.current_pipeline, self.best_models, self.history = self.api_composer.obtain_model(self.train_data)

if predefined_model is not None:
# Fit predefined model and return it without composing
self.current_pipeline = PredefinedModel(predefined_model, self.train_data, self.log,
use_input_preprocessing=self.params.get(
'use_input_preprocessing')).fit()
else:
self.current_pipeline, self.best_models, self.history = self.api_composer.obtain_model(self.train_data)
if self.current_pipeline is None:
raise ValueError('No models were found')

if self.current_pipeline is None:
raise ValueError('No models were found')
full_train_not_preprocessed = deepcopy(self.train_data)
# Final fit for obtained pipeline on full dataset

full_train_not_preprocessed = deepcopy(self.train_data)
# Final fit for obtained pipeline on full dataset
if self.history and not self.history.is_empty() or not self.current_pipeline.is_fitted:
self._train_pipeline_on_full_dataset(recommendations_for_data, full_train_not_preprocessed)
self.log.message('Final pipeline was fitted')
else:
self.log.message('Already fitted initial pipeline is used')
with fedot_composer_timer.launch_train_inference():
if self.history and not self.history.is_empty() or not self.current_pipeline.is_fitted:
self._train_pipeline_on_full_dataset(recommendations_for_data, full_train_not_preprocessed)
self.log.message('Final pipeline was fitted')
else:
self.log.message('Already fitted initial pipeline is used')

# Merge API & pipelines encoders if it is required
self.current_pipeline.preprocessor = BasePreprocessor.merge_preprocessors(
Expand Down Expand Up @@ -217,26 +226,28 @@ def tune(self,
if self.current_pipeline is None:
raise ValueError(NOT_FITTED_ERR_MSG)

input_data = input_data or self.train_data
cv_folds = cv_folds or self.params.get('cv_folds')
n_jobs = n_jobs or self.params.n_jobs
with fedot_composer_timer.launch_tuning('post'):
input_data = input_data or self.train_data
cv_folds = cv_folds or self.params.get('cv_folds')
n_jobs = n_jobs or self.params.n_jobs

metric = metric_name if metric_name else self.metrics[0]

metric = metric_name if metric_name else self.metrics[0]
pipeline_tuner = (TunerBuilder(self.params.task)
.with_tuner(SimultaneousTuner)
.with_cv_folds(cv_folds)
.with_n_jobs(n_jobs)
.with_metric(metric)
.with_iterations(iterations)
.with_timeout(timeout)
.build(input_data))

pipeline_tuner = (TunerBuilder(self.params.task)
.with_tuner(SimultaneousTuner)
.with_cv_folds(cv_folds)
.with_n_jobs(n_jobs)
.with_metric(metric)
.with_iterations(iterations)
.with_timeout(timeout)
.build(input_data))
self.current_pipeline = pipeline_tuner.tune(self.current_pipeline, show_progress)
self.api_composer.was_tuned = pipeline_tuner.was_tuned

self.current_pipeline = pipeline_tuner.tune(self.current_pipeline, show_progress)
self.api_composer.was_tuned = pipeline_tuner.was_tuned
# Tuner returns a not fitted pipeline, and it is required to fit on train dataset
self.current_pipeline.fit(self.train_data)

# Tuner returns a not fitted pipeline, and it is required to fit on train dataset
self.current_pipeline.fit(self.train_data)
return self.current_pipeline

def predict(self,
Expand All @@ -262,16 +273,19 @@ def predict(self,
if self.current_pipeline is None:
raise ValueError(NOT_FITTED_ERR_MSG)

self.test_data = self.data_processor.define_data(target=self.target, features=features, is_predict=True)
with fedot_composer_timer.launch_data_definition('predict'):
self.test_data = self.data_processor.define_data(target=self.target, features=features, is_predict=True)
self._is_in_sample_prediction = in_sample

if isinstance(self.test_data, InputData) and self.params.get('use_auto_preprocessing'):
self.test_data = self.data_processor.transform(self.test_data, self.current_pipeline)
with fedot_composer_timer.launch_preprocessing():
self.test_data = self.data_processor.transform(self.test_data, self.current_pipeline)

self.prediction = self.data_processor.define_predictions(current_pipeline=self.current_pipeline,
test_data=self.test_data,
in_sample=self._is_in_sample_prediction,
validation_blocks=validation_blocks)
with fedot_composer_timer.launch_predicting():
self.prediction = self.data_processor.define_predictions(current_pipeline=self.current_pipeline,
test_data=self.test_data,
in_sample=self._is_in_sample_prediction,
validation_blocks=validation_blocks)

if save_predictions:
self.save_predict(self.prediction)
Expand All @@ -296,18 +310,19 @@ def predict_proba(self,
if self.current_pipeline is None:
raise ValueError(NOT_FITTED_ERR_MSG)

if self.params.task.task_type == TaskTypesEnum.classification:
self.test_data = self.data_processor.define_data(target=self.target,
features=features, is_predict=True)
with fedot_composer_timer.launch_predicting():
if self.params.task.task_type == TaskTypesEnum.classification:
self.test_data = self.data_processor.define_data(target=self.target,
features=features, is_predict=True)

mode = 'full_probs' if probs_for_all_classes else 'probs'
mode = 'full_probs' if probs_for_all_classes else 'probs'

self.prediction = self.current_pipeline.predict(self.test_data, output_mode=mode)
self.prediction = self.current_pipeline.predict(self.test_data, output_mode=mode)

if save_predictions:
self.save_predict(self.prediction)
else:
raise ValueError('Probabilities of predictions are available only for classification')
if save_predictions:
self.save_predict(self.prediction)
else:
raise ValueError('Probabilities of predictions are available only for classification')

return self.prediction.predict

Expand Down Expand Up @@ -497,6 +512,28 @@ def explain(self, features: FeaturesType = None,

return explainer

def return_report(self) -> pd.DataFrame:
""" Function returns a report on time consumption.
The following steps are presented in this report:
- 'Data Definition (fit)': Time spent on data definition in fit().
- 'Data Preprocessing': Total time spent on preprocessing data, includes fitting and predicting stages.
- 'Fitting (summary)': Total time spent on Composing, Tuning and Training Inference.
- 'Composing': Time spent on searching for the best pipeline.
- 'Train Inference': Time spent on training the pipeline found during composing.
- 'Tuning (composing)': Time spent on hyperparameters tuning in the whole fitting, if with_tune is True.
- 'Tuning (after)': Time spent on .tune() (hyperparameters tuning) after composing.
- 'Data Definition (predict)': Time spent on data definition in predict().
- 'Predicting': Time spent on predicting (inference).
"""
report = fedot_composer_timer.report

if self.current_pipeline is None:
raise ValueError(NOT_FITTED_ERR_MSG)

report = pd.DataFrame(data=report.values(), index=report.keys())
return report.iloc[:, 0].dt.components.iloc[:, :-2]

@staticmethod
def _init_logger(logging_level: int):
# reset logging level for Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def check_and_update_params(self):
early_stopping_rounds = self.params.get('early_stopping_rounds')
use_eval_set = self.params.get('use_eval_set')

if use_best_model or early_stopping_rounds and not use_eval_set:
if (use_best_model or isinstance(early_stopping_rounds, int)) and not use_eval_set:
self.params.update(use_best_model=False, early_stopping_rounds=False)

@staticmethod
Expand Down
6 changes: 5 additions & 1 deletion fedot/core/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from fedot.core.visualisation.pipeline_specific_visuals import PipelineVisualizer
from fedot.preprocessing.dummy_preprocessing import DummyPreprocessor
from fedot.preprocessing.preprocessing import DataPreprocessor
from fedot.utilities.composer_timer import fedot_composer_timer

ERROR_PREFIX = 'Invalid pipeline configuration:'

Expand Down Expand Up @@ -187,13 +188,16 @@ def fit(self, input_data: Union[InputData, MultiModalData],
if isinstance(input_data, InputData) and input_data.supplementary_data.is_auto_preprocessed:
copied_input_data = deepcopy(input_data)
else:
copied_input_data = self._preprocess(input_data)
with fedot_composer_timer.launch_preprocessing():
copied_input_data = self._preprocess(input_data)

copied_input_data = self._assign_data_to_nodes(copied_input_data)

if time_constraint is None:
train_predicted = self._fit(input_data=copied_input_data)
else:
train_predicted = self._fit_with_time_limit(input_data=copied_input_data, time=time_constraint)

return train_predicted

@property
Expand Down
Loading

0 comments on commit c17381c

Please sign in to comment.