Skip to content

Commit

Permalink
922 preprocessor acceleration (#1004)
Browse files Browse the repository at this point in the history
Preprocessor acceleration by @IIaKyJIuH & Ability to use single preprocessing from API by @aPovidlo + other preprocessing fixes
  • Loading branch information
IIaKyJIuH authored Dec 11, 2023
1 parent 4ef1c8e commit da94b3e
Show file tree
Hide file tree
Showing 35 changed files with 854 additions and 971 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea/
.vscode/
**/.pytest_cache/
**/__pycache__/
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm
Expand Down Expand Up @@ -76,4 +77,4 @@ dist/
test/unit/test_log.log
test/unit/catboost_info

local/
local/
54 changes: 54 additions & 0 deletions fedot/api/api_utils/api_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import sys
from datetime import datetime
from typing import Dict, Union
from typing import Optional

import numpy as np
from golem.core.log import default_log

from fedot.api.api_utils.data_definition import data_strategy_selector, FeaturesType, TargetType
from fedot.core.data.data import InputData, OutputData, data_type_is_table
Expand All @@ -10,6 +13,7 @@
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.ts_wrappers import in_sample_ts_forecast, convert_forecast_to_output
from fedot.core.repository.tasks import Task, TaskTypesEnum
from fedot.core.utils import convert_memory_size
from fedot.preprocessing.dummy_preprocessing import DummyPreprocessor
from fedot.preprocessing.preprocessing import DataPreprocessor

Expand Down Expand Up @@ -39,6 +43,8 @@ def __init__(self, task: Task, use_input_preprocessing: bool = True):
self._recommendations = {'cut': self.preprocessor.cut_dataset,
'label_encoded': self.preprocessor.label_encoding_for_fit}

self.log = default_log(self)

def define_data(self,
features: FeaturesType,
target: Optional[TargetType] = None,
Expand Down Expand Up @@ -123,3 +129,51 @@ def accept_and_apply_recommendations(self, input_data: Union[InputData, MultiMod
for name, rec in recommendations.items():
# Apply desired preprocessing function
self._recommendations[name](input_data, *rec.values())

def fit_transform(self, train_data: InputData) -> InputData:
start_time = datetime.now()
self.log.message('Preprocessing data')
memory_usage = convert_memory_size(sys.getsizeof(train_data.features))
features_shape = train_data.features.shape
target_shape = train_data.target.shape
self.log.message(
f'Train Data (Original) Memory Usage: {memory_usage} Data Shapes: {features_shape, target_shape}')

train_data = self.preprocessor.obligatory_prepare_for_fit(data=train_data)
train_data = self.preprocessor.optional_prepare_for_fit(pipeline=Pipeline(), data=train_data)
train_data = self.preprocessor.convert_indexes_for_fit(pipeline=Pipeline(), data=train_data)
train_data.supplementary_data.is_auto_preprocessed = True

memory_usage = convert_memory_size(sys.getsizeof(train_data.features))
features_shape = train_data.features.shape
target_shape = train_data.target.shape
self.log.message(
f'Train Data (Processed) Memory Usage: {memory_usage} Data Shape: {features_shape, target_shape}')
self.log.message(f'Data preprocessing runtime = {datetime.now() - start_time}')

return train_data

def transform(self, test_data: InputData, current_pipeline) -> InputData:
start_time = datetime.now()
self.log.message('Preprocessing data')
memory_usage = convert_memory_size(sys.getsizeof(test_data))
features_shape = test_data.features.shape
target_shape = test_data.target.shape
self.log.message(
f'Test Data (Original) Memory Usage: {memory_usage} Data Shapes: {features_shape, target_shape}')

test_data = self.preprocessor.obligatory_prepare_for_predict(data=test_data)
test_data = self.preprocessor.optional_prepare_for_predict(pipeline=current_pipeline, data=test_data)
test_data = self.preprocessor.convert_indexes_for_predict(pipeline=current_pipeline, data=test_data)
test_data = self.preprocessor.update_indices_for_time_series(test_data)
test_data.supplementary_data.is_auto_preprocessed = True

memory_usage = convert_memory_size(sys.getsizeof(test_data))
features_shape = test_data.features.shape
target_shape = test_data.target.shape
self.log.message(
f'Test Data (Processed) Memory Usage: {memory_usage} Data Shape: {features_shape, target_shape}')
self.log.message(f'Data preprocessing runtime = {datetime.now() - start_time}')

return test_data

1 change: 1 addition & 0 deletions fedot/api/api_utils/api_params_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def default_params_for_task(task_type: TaskTypesEnum) -> dict:
use_pipelines_cache=True,
use_preprocessing_cache=True,
use_input_preprocessing=True,
use_auto_preprocessing=False,
use_meta_rules=False,
cache_dir=default_fedot_data_dir(),
keep_history=True,
Expand Down
19 changes: 6 additions & 13 deletions fedot/api/api_utils/input_analyser.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from functools import partial
from inspect import signature
from typing import Any, Dict, Tuple, Union

import numpy as np
from typing import Dict, Tuple, Any, Union

from golem.core.log import default_log

from fedot.core.composer.meta_rules import get_cv_folds_number, get_recommended_preset, \
get_early_stopping_generations
from fedot.core.composer.meta_rules import get_cv_folds_number, get_early_stopping_generations, get_recommended_preset
from fedot.core.data.data import InputData
from fedot.core.data.data_preprocessing import find_categorical_columns
from fedot.core.data.multi_modal import MultiModalData
from fedot.core.repository.dataset_types import DataTypesEnum


meta_rules = [get_cv_folds_number,
get_recommended_preset,
get_early_stopping_generations]
Expand Down Expand Up @@ -80,6 +77,7 @@ def _give_recommendations_for_data(self, input_data: InputData) -> Dict:
recommendations_for_data['cut'] = {'border': border}
is_label_encoding_needed = self.control_categorical(input_data)
if is_label_encoding_needed:
self._log.info('Switch categorical encoder to label encoder')
recommendations_for_data['label_encoded'] = {}
return recommendations_for_data

Expand Down Expand Up @@ -118,11 +116,6 @@ def control_categorical(self, input_data: InputData) -> bool:
"""

categorical_ids, _ = find_categorical_columns(input_data.features)
all_cardinality = 0
need_label = False
for idx in categorical_ids:
all_cardinality += np.unique(input_data.features[:, idx].astype(str)).shape[0]
if all_cardinality > self.max_cat_cardinality:
need_label = True
break
return need_label
# Counts unique categories for each feature, and then counts their number
uniques_cats = sum([len(np.unique(feature)) for feature in input_data.features[:, categorical_ids].astype(str)])
return uniques_cats > self.max_cat_cardinality
2 changes: 2 additions & 0 deletions fedot/api/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ def setup_data_preprocessing(
safe_mode: bool = DEFAULT_VALUE,
use_input_preprocessing: bool = DEFAULT_VALUE,
use_preprocessing_cache: bool = DEFAULT_VALUE,
use_auto_preprocessing: bool = DEFAULT_VALUE,
) -> FedotBuilder:
""" Sets parameters of input data preprocessing.
Expand All @@ -351,6 +352,7 @@ def setup_data_preprocessing(
safe_mode=safe_mode,
use_input_preprocessing=use_input_preprocessing,
use_preprocessing_cache=use_preprocessing_cache,
use_auto_preprocessing=use_auto_preprocessing,
)
return self

Expand Down
15 changes: 12 additions & 3 deletions fedot/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ def fit(self,

self._init_remote_if_necessary()

if isinstance(self.train_data, InputData) and self.params.get('use_auto_preprocessing'):
self.train_data = self.data_processor.fit_transform(self.train_data)

if predefined_model is not None:
# Fit predefined model and return it without composing
self.current_pipeline = PredefinedModel(predefined_model, self.train_data, self.log,
Expand All @@ -176,9 +179,12 @@ def fit(self,
else:
self.log.message('Already fitted initial pipeline is used')

# Store data encoder in the pipeline if it is required
# Merge API & pipelines encoders if it is required
self.current_pipeline.preprocessor = BasePreprocessor.merge_preprocessors(
self.data_processor.preprocessor, self.current_pipeline.preprocessor)
api_preprocessor=self.data_processor.preprocessor,
pipeline_preprocessor=self.current_pipeline.preprocessor,
use_auto_preprocessing=self.params.get('use_auto_preprocessing')
)

self.log.message(f'Final pipeline: {graph_structure(self.current_pipeline)}')

Expand Down Expand Up @@ -259,6 +265,9 @@ def predict(self,
self.test_data = self.data_processor.define_data(target=self.target, features=features, is_predict=True)
self._is_in_sample_prediction = in_sample

if isinstance(self.test_data, InputData) and self.params.get('use_auto_preprocessing'):
self.test_data = self.data_processor.transform(self.test_data, self.current_pipeline)

self.prediction = self.data_processor.define_predictions(current_pipeline=self.current_pipeline,
test_data=self.test_data,
in_sample=self._is_in_sample_prediction,
Expand Down Expand Up @@ -522,4 +531,4 @@ def _train_pipeline_on_full_dataset(self, recommendations: Optional[dict],
self.current_pipeline.fit(
full_train_not_preprocessed,
n_jobs=self.params.n_jobs
)
)
9 changes: 6 additions & 3 deletions fedot/core/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,14 @@ def subset_indices(self, selected_idx: List):
target=self.target[row_nums],
task=self.task, data_type=self.data_type)

def subset_features(self, features_ids: list):
"""Return new :obj:`InputData` with subset of features based on ``features_ids`` list
def subset_features(self, feature_ids: list) -> Optional[InputData]:
"""
Return new :obj:`InputData` with subset of features based on non-empty ``features_ids`` list or `None` otherwise
"""
if not feature_ids:
return None

subsample_features = self.features[:, features_ids]
subsample_features = self.features[:, feature_ids]
subsample_input = InputData(features=subsample_features,
data_type=self.data_type,
target=self.target, task=self.task,
Expand Down
Loading

0 comments on commit da94b3e

Please sign in to comment.