Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomized model impovements #1232

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/advanced/additional_learning.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd

from fedot import Fedot
from fedot.core.operations.atomized_model import AtomizedModel
from fedot.core.operations.atomized_model.atomized_model import AtomizedModel
from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.pipeline_builder import PipelineBuilder
Expand Down
19 changes: 14 additions & 5 deletions fedot/api/api_utils/api_params_repository.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
from typing import Sequence

from fedot.core.optimisers.genetic_operators.mutation import fedot_single_edge_mutation, fedot_single_add_mutation, \
fedot_single_change_mutation, fedot_single_drop_mutation
from golem.core.optimisers.genetic.operators.inheritance import GeneticSchemeTypesEnum
from golem.core.optimisers.genetic.operators.mutation import MutationTypesEnum

Expand Down Expand Up @@ -128,13 +130,20 @@ def _get_default_mutations(task_type: TaskTypesEnum, params) -> Sequence[Mutatio
MutationTypesEnum.single_add,
MutationTypesEnum.single_edge]

# TODO remove workaround after boosting mutation fix
# Boosting mutation does not work due to problem with __eq__ with it copy.
# ``partial`` refactor to ``def`` does not work
# Also boosting mutation does not work by it own.
if task_type == TaskTypesEnum.ts_forecasting:
# TODO remove workaround after boosting mutation fix
# Boosting mutation does not work due to problem with __eq__ with it copy.
# ``partial`` refactor to ``def`` does not work
# Also boosting mutation does not work by it own.
# mutations.append(partial(boosting_mutation, params=params))
pass

# TODO remove when tests will ends
# add for testing purpose
mutations = [parameter_change_mutation,
fedot_single_edge_mutation,
fedot_single_add_mutation,
fedot_single_change_mutation,
fedot_single_drop_mutation]
else:
mutations.append(add_resample_mutation)

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
from collections import Counter
from datetime import timedelta
from functools import reduce
from operator import and_, or_
from typing import Any, Callable, Dict, List, Optional, Set, Union

from golem.core.tuning.simultaneous import SimultaneousTuner

from fedot.core.data.data import InputData, OutputData
from fedot.core.operations.operation import Operation
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
from fedot.core.repository.metrics_repository import MetricCallable
from fedot.core.repository.operation_types_repository import OperationMetaInfo, atomized_model_type


Expand All @@ -26,6 +19,11 @@ def __init__(self, pipeline: 'Pipeline'):
super().__init__(operation_type=atomized_model_type())
self.pipeline = pipeline

@property
def acceptable_task_types(self):
root_operation = self.pipeline.root_node.operation
return root_operation.acceptable_task_types

def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData) -> ('Pipeline', OutputData):
predicted_train = self.pipeline.fit(input_data=data)
fitted_atomized_operation = self.pipeline
Expand All @@ -50,27 +48,31 @@ def predict_for_fit(self,
return self.predict(fitted_operation, data, params, output_mode)

def fine_tune(self,
metric_function: MetricCallable,
metric_function: 'MetricCallable',
input_data: Optional[InputData] = None,
iterations: int = 50,
timeout: int = 5) -> 'AtomizedModel':
""" Method for tuning hyperparameters """
tuner = TunerBuilder(input_data.task) \
.with_tuner(SimultaneousTuner) \
.with_metric(metric_function) \
.with_iterations(iterations) \
.with_timeout(timedelta(minutes=timeout)) \
.build(input_data)
tuned_pipeline = tuner.tune(self.pipeline)
tuned_atomized_model = AtomizedModel(tuned_pipeline)
return tuned_atomized_model
# TODO Fix tuner with atomized model
# cannot be made by that way due to problem with circular import
# TODO add tests for atomized tuning
# origin test was removed
# tuner = TunerBuilder(input_data.task) \
# .with_tuner(SimultaneousTuner) \
# .with_metric(metric_function) \
# .with_iterations(iterations) \
# .with_timeout(timedelta(minutes=timeout)) \
# .build(input_data)
# tuned_pipeline = tuner.tune(self.pipeline)
# tuned_atomized_model = AtomizedModel(tuned_pipeline)
# return tuned_atomized_model

@property
def metadata(self) -> OperationMetaInfo:
root_node = self.pipeline.root_node

def extract_metadata_from_pipeline(attr_name: str,
node_filter: Optional[Callable[[PipelineNode], bool]] = None,
node_filter: Optional[Callable[['PipelineNode'], bool]] = None,
reduce_function: Optional[Callable[[Set], Set]] = None) -> List[Any]:
""" Extract metadata from atomized pipeline
:param attr_name: extracting metadata property
Expand Down Expand Up @@ -110,8 +112,8 @@ def description(self, operation_params: Optional[dict] = None) -> str:
operation_types = map(lambda node: node.operation.operation_type,
self.pipeline.nodes)
operation_types_dict = dict(Counter(operation_types))
return f'{operation_type}_length:{operation_length}_depth:{operation_depth}' \
f'_types:{operation_types_dict}_id:{operation_id}'
return (f'{self.__class__}({operation_type}_length:{operation_length}_depth:{operation_depth}'
f'_types:{operation_types_dict}_id:{operation_id})')

@staticmethod
def assign_tabular_column_types(output_data: OutputData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def __init__(self, node: PipelineNode = None, operation_id: int = None, nodes_fr
# Need use the imports inside the class because of the problem of circular imports.
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.template import PipelineTemplate
from fedot.core.operations.atomized_model import AtomizedModel
from fedot.core.operations.atomized_model.atomized_model import AtomizedModel

super().__init__()
self.atomized_model_json_path = None
Expand Down
Empty file.
82 changes: 82 additions & 0 deletions fedot/core/optimisers/genetic_operators/mutation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from copy import deepcopy
from functools import WRAPPER_ASSIGNMENTS
from random import choice
from typing import Dict, Callable

from golem.core.adapter import register_native
from golem.core.optimisers.genetic.operators.base_mutations import single_edge_mutation, single_add_mutation, \
single_change_mutation, single_drop_mutation
from golem.core.optimisers.graph import OptGraph
from golem.core.optimisers.optimization_parameters import GraphRequirements
from golem.core.optimisers.optimizer import GraphGenerationParams
from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters


def _extract_graphs(graph: OptGraph) -> Dict[str, OptGraph]:
""" Get all graphs from graph with atomized nodes
Return dict with key as node uid (where graph is stored in atomized models)
and values as graphs """
graphs = {'': graph}
for node in graph.nodes:
if 'inner_graph' in node.content:
extracted_graphs = _extract_graphs(node.content['inner_graph'])
for k, v in extracted_graphs.items():
graphs[k or node.uid] = v
return graphs


def _insert_graphs(full_graph: OptGraph, node_uid: str, graph: OptGraph) -> OptGraph:
""" Insert graph to full_graph with atomized model in node with uid node_uid """
if node_uid == '':
full_graph = graph
else:
full_graph = full_graph
# look for node with uid == node_uid
nodes = full_graph.nodes[:]
while nodes:
node = nodes.pop()
if node.uid == node_uid:
break
if 'inner_graph' in node.content:
nodes.extend(node.content['inner_graph'].nodes)
else:
raise ValueError(f"Unknown node uid: {node_uid}")
if 'inner_graph' not in node.content:
raise ValueError('Cannot insert graph to non AtomizedModel')
node.content['inner_graph'] = graph
return full_graph


MutationFun = Callable[[OptGraph, GraphRequirements, GraphGenerationParams, GPAlgorithmParameters], OptGraph]


def atomized_mutation(mutation_fun: MutationFun) -> MutationFun:
def mutation_for_atomized_graph(graph: OptGraph,
requirements: GraphRequirements,
graph_gen_params: GraphGenerationParams,
parameters: GPAlgorithmParameters,
) -> OptGraph:
graph = deepcopy(graph)
graphs = _extract_graphs(graph)
node_uid, graph_to_mutate = choice(list(graphs.items()))

mutated_graph = mutation_fun(graph=graph_to_mutate,
requirements=requirements,
graph_gen_params=graph_gen_params,
parameters=parameters)

new_graph = _insert_graphs(graph, node_uid, mutated_graph)
return new_graph

# TODO use functools.wraps. now it brokes something in GOLEM.
for attr in WRAPPER_ASSIGNMENTS:
setattr(mutation_for_atomized_graph, attr, getattr(mutation_fun, attr))
mutation_for_atomized_graph.__wrapped__ = mutation_fun

return register_native(mutation_for_atomized_graph)


fedot_single_edge_mutation = atomized_mutation(single_edge_mutation)
fedot_single_add_mutation = atomized_mutation(single_add_mutation)
fedot_single_change_mutation = atomized_mutation(single_change_mutation)
fedot_single_drop_mutation = atomized_mutation(single_drop_mutation)
31 changes: 21 additions & 10 deletions fedot/core/pipelines/adapters.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from copy import deepcopy
from typing import Any, Optional, Dict

from fedot.core.operations.atomized_model.atomized_model import AtomizedModel
from golem.core.adapter import BaseOptimizationAdapter
from golem.core.dag.graph_utils import map_dag_nodes
from golem.core.optimisers.graph import OptGraph, OptNode
from golem.core.optimisers.graph import OptGraph

from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.node import PipelineNode, OptNode
from fedot.core.pipelines.pipeline import Pipeline


Expand All @@ -17,6 +18,8 @@ class PipelineAdapter(BaseOptimizationAdapter[Pipeline]):
fitted models) that can be used for reconstructing Pipelines.
"""

# TODO add tests for correct convertation of AtomizedModel

def __init__(self, use_input_preprocessing: bool = True):
super().__init__(base_graph_class=Pipeline)

Expand All @@ -25,17 +28,25 @@ def __init__(self, use_input_preprocessing: bool = True):
@staticmethod
def _transform_to_opt_node(node: PipelineNode) -> OptNode:
# Prepare content for nodes, leave only simple data
operation_name = str(node.operation)
content = {'name': operation_name,
'params': node.parameters,
'metadata': node.metadata}
return OptNode(deepcopy(content))
content = dict(name=str(node.operation),
params=deepcopy(node.parameters),
metadata=deepcopy(node.metadata))

# add data about inner graph if it is atomized model
if isinstance(node.operation, AtomizedModel):
content['inner_graph'] = PipelineAdapter()._adapt(node.operation.pipeline)

return OptNode(content)

@staticmethod
def _transform_to_pipeline_node(node: OptNode) -> PipelineNode:
# deepcopy to avoid accidental information sharing between opt graphs & pipelines
content = deepcopy(node.content)
return PipelineNode(operation_type=content['name'], content=content)
if 'inner_graph' in node.content:
atomized_pipeline = PipelineAdapter()._restore(node.content['inner_graph'])
return PipelineNode(AtomizedModel(atomized_pipeline))
else:
# deepcopy to avoid accidental information sharing between opt graphs & pipelines
content = deepcopy(node.content)
return PipelineNode(operation_type=content['name'], content=content)

def _adapt(self, adaptee: Pipeline) -> OptGraph:
adapted_nodes = map_dag_nodes(self._transform_to_opt_node, adaptee.nodes)
Expand Down
24 changes: 24 additions & 0 deletions fedot/core/pipelines/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from typing import Any, List, Optional, Tuple, Union

import numpy as np

from fedot.core.operations.atomized_model.atomized_model import AtomizedModel
from golem.core.dag.linked_graph_node import LinkedGraphNode
from golem.core.log import default_log
from golem.core.optimisers.graph import OptNode as GolemOptNode
from golem.core.optimisers.timer import Timer
from golem.serializers.serializer import register_serializable

Expand All @@ -30,6 +33,18 @@ class NodeMetadata:
metric: Optional[float] = None


class OptNode(GolemOptNode):
""" Wrap for GOLEM OptNode that adds ability to descript nodes with AtomizedModel
It is used in PipelineAdapter for convert graph to GOLEM representation """
def description(self) -> str:
# TODO add test
node_label = super().description()
if 'inner_graph' in self.content:
root_nodes = self.content['inner_graph'].root_nodes()
node_label = f"{node_label}(INNER{''.join(node.descriptive_id for node in root_nodes)}INNER)"
return node_label


class PipelineNode(LinkedGraphNode):
"""The class defines the interface of nodes modifying tha data flow in the :class:`Pipeline`

Expand Down Expand Up @@ -82,6 +97,15 @@ def is_primary(self):
if not self.nodes_from or len(self.nodes_from) == 0:
return True

def description(self) -> str:
# TODO add test
# TODO there is description in `Operation` why is it not used?
node_label = super().description()
if isinstance(self.operation, AtomizedModel):
root_nodes = self.operation.pipeline.root_nodes()
node_label = f"{node_label}(INNER{''.join(node.descriptive_id for node in root_nodes)}INNER)"
return node_label

def _process_content_init(self, passed_content: dict) -> Operation:
""" Updating content in the node """
if isinstance(passed_content['name'], str):
Expand Down
4 changes: 2 additions & 2 deletions fedot/core/pipelines/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
from golem.core.log import default_log

from fedot.core.operations.atomized_template import AtomizedModelTemplate
from fedot.core.operations.atomized_model.atomized_template import AtomizedModelTemplate
from fedot.core.operations.operation_template import OperationTemplate, check_existing_path
from fedot.core.pipelines.node import PipelineNode

Expand Down Expand Up @@ -316,7 +316,7 @@ def convert_to_pipeline(self, pipeline: 'Pipeline', path: str = None, dict_fitte
if preprocessor_file:
try:
pipeline.preprocessor = joblib.load(preprocessor_file)
except ModuleNotFoundError as ex:
except ModuleNotFoundError:
self.log.warning(f'Could not load preprocessor from file `{preprocessor_file}` '
f'due to legacy incompatibility. Please refit the preprocessor.')
else:
Expand Down
2 changes: 1 addition & 1 deletion fedot/core/pipelines/verification_rules.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from fedot.core.operations.atomized_model import AtomizedModel
from fedot.core.operations.atomized_model.atomized_model import AtomizedModel
from fedot.core.operations.model import Model
from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
Expand Down
4 changes: 2 additions & 2 deletions fedot/core/repository/data/data_operation_repository.json
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@
"ransac_lin_reg": {
"meta": "regression_preprocessing",
"presets": ["fast_train", "*tree"],
"tags": ["affects_target", "linear", "filtering", "correct_params", "non_applicable_for_ts"]
"tags": ["affects_target", "linear", "filtering", "non-default", "correct_params", "non_applicable_for_ts"]
},
"ransac_non_lin_reg": {
"meta": "regression_preprocessing",
"presets": ["*tree"],
"tags": ["affects_target", "non_linear", "filtering",
"tags": ["affects_target", "non_linear", "filtering", "non-default",
"correct_params", "non_applicable_for_ts"]
},
"isolation_forest_reg": {
Expand Down
Loading
Loading