Skip to content

[NEAT-874] 🔨 Interface for state #1077

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion cognite/neat/_graph/transformers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._base import BaseTransformerStandardised
from ._base import BaseTransformer, BaseTransformerStandardised
from ._classic_cdf import (
AddAssetDepth,
AssetEventConnector,
Expand Down Expand Up @@ -27,6 +27,7 @@
"AssetSequenceConnector",
"AssetTimeSeriesConnector",
"AttachPropertyFromTargetToSource",
"BaseTransformer",
"ConnectionToLiteral",
"ConvertLiteral",
"LiteralToEntity",
Expand Down
4 changes: 4 additions & 0 deletions cognite/neat/_state/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from ._base import NeatStateManager
from ._types import Action

__all__ = ["Action", "NeatStateManager"]
90 changes: 90 additions & 0 deletions cognite/neat/_state/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from pathlib import Path

from cognite.neat._client import NeatClient
from cognite.neat._graph.extractors import BaseExtractor
from cognite.neat._graph.loaders import BaseLoader
from cognite.neat._graph.transformers import BaseTransformer
from cognite.neat._issues import IssueList
from cognite.neat._rules.exporters import BaseExporter, CDFExporter
from cognite.neat._rules.exporters._base import T_Export, T_VerifiedRules
from cognite.neat._rules.importers import BaseImporter
from cognite.neat._rules.transformers import VerifiedRulesTransformer
from cognite.neat._store import NeatGraphStore, NeatRulesStore
from cognite.neat._utils.upload import UploadResultList

from ._state import EmptyState, InternalState
from ._types import Action
from .exception import InvalidStateTransition


# Todo: This class is in progress and not currently used. Through a series of PRs, it will replace the
# SessionState class as well as move the logic from the _session into this _state module.
class NeatStateManager:
"""The neat state contains three main components:

- Instances: stored in a triple store.
- Conceptual rules: The schema for conceptual rules.
- Physical rules: The schema for physical rules.
"""

def __init__(self) -> None:
self._rule_store = NeatRulesStore()
self._graph_store = NeatGraphStore.from_memory_store()
self._state: InternalState = EmptyState(self._rule_store, self._graph_store)

@property
def status(self) -> str:
"""Returns the display name of the current state."""
return self._state.display_name

def change(self, action: Action) -> IssueList:
Copy link
Collaborator

@nikokaoja nikokaoja Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is change in terms of what we define as change in Provenance, therefore, your argument is not action, but Agent (extractor, loader, transformer) that is configured to perform certain Activity, so this should be called activity not action

"""Perform an action on the current state.

This methods checks if the action is valid for the current state, performs the action, and if successful,
transitions to the next state. If the action is not valid, it raises an InvalidStateTransition error.

Args:
action (Action): The action to perform.

Raises:
InvalidStateTransition: If the action is not valid for the current state.
TypeError: If the action is of an unknown type.

Returns:
IssueList: The issues encountered during the action.

"""
if not self._state.is_valid_transition(action):
raise InvalidStateTransition(
f"Cannot perform {type(action).__name__} action in state {self._state.display_name}"
)
if isinstance(action, BaseImporter):
issues = self._rule_store.import_rules(action)
elif isinstance(action, BaseExtractor):
issues = self._graph_store.write(action)
elif isinstance(action, VerifiedRulesTransformer):
issues = self._rule_store.transform(action)
elif isinstance(action, BaseTransformer):
# The self._graph_store.transform(action) does not return IssueList
raise NotImplementedError()
else:
raise TypeError(f"Unknown action type: {type(action).__name__}")
if not issues.has_errors:
self._state = self._state.next_state(action)
return issues

def export(self, exporter: BaseExporter[T_VerifiedRules, T_Export]) -> T_Export: # type: ignore[type-arg, type-var]
"""Export the rules to the specified format."""
raise NotImplementedError

def export_to_file(self, exporter: BaseExporter, path: Path) -> None:
"""Export the rules to a file."""
raise NotImplementedError

def export_to_cdf(self, exporter: CDFExporter, client: NeatClient, dry_run: bool) -> UploadResultList:
"""Export the rules to CDF."""
raise NotImplementedError

def load(self, loader: BaseLoader) -> UploadResultList:
"""Load the instances into CDF."""
raise NotImplementedError
71 changes: 71 additions & 0 deletions cognite/neat/_state/_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from abc import ABC, abstractmethod

from cognite.neat._graph.extractors import BaseExtractor
from cognite.neat._graph.transformers import BaseTransformer
from cognite.neat._rules.importers import BaseImporter
from cognite.neat._rules.transformers import InformationToDMS, VerifiedRulesTransformer
from cognite.neat._store import NeatGraphStore, NeatRulesStore

from ._types import Action


class InternalState(ABC):
"""This is the base class for all internal states (internal to this module)

This implements a state machine which is used by the NeatState which is the
external (related to this module) API.
"""

def __init__(self, rule_store: NeatRulesStore, graph_store: NeatGraphStore) -> None:
self._rule_store = rule_store
self._graph_store = graph_store

@property
def display_name(self) -> str:
return type(self).__name__.removesuffix("State")

@abstractmethod
def is_valid_transition(self, action: Action) -> bool:
raise NotImplementedError()

@abstractmethod
def next_state(self, action: Action) -> "InternalState":
raise NotImplementedError()


class EmptyState(InternalState):
def is_valid_transition(self, action: Action) -> bool:
return isinstance(action, BaseImporter | BaseExtractor)

def next_state(self, action: Action) -> "InternalState":
if isinstance(action, BaseExtractor):
return InstancesState(self._rule_store, self._graph_store)
elif isinstance(action, BaseImporter):
return ConceptualState(self._rule_store, self._graph_store)
raise NotImplementedError()


class InstancesState(InternalState):
def is_valid_transition(self, action: Action) -> bool:
return isinstance(action, BaseTransformer)

def next_state(self, action: Action) -> "InternalState":
raise NotImplementedError()


class ConceptualState(InternalState):
def is_valid_transition(self, action: Action) -> bool:
return isinstance(action, VerifiedRulesTransformer)

def next_state(self, action: Action) -> "InternalState":
if isinstance(action, InformationToDMS):
return PhysicalState(self._rule_store, self._graph_store)
return self


class PhysicalState(InternalState):
def is_valid_transition(self, action: Action) -> bool:
return isinstance(action, VerifiedRulesTransformer)

def next_state(self, action: Action) -> "InternalState":
return self
8 changes: 8 additions & 0 deletions cognite/neat/_state/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import TypeAlias

from cognite.neat._graph.extractors import BaseExtractor
from cognite.neat._graph.transformers import BaseTransformer, BaseTransformerStandardised
from cognite.neat._rules.importers import BaseImporter
from cognite.neat._rules.transformers import RulesTransformer

Action: TypeAlias = BaseImporter | BaseExtractor | RulesTransformer | BaseTransformerStandardised | BaseTransformer
1 change: 1 addition & 0 deletions cognite/neat/_state/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
class InvalidStateTransition(RuntimeError): ...
104 changes: 104 additions & 0 deletions tests/tests_unit/test_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from collections.abc import Iterable
from pathlib import Path

import pytest
from rdflib import Graph

from cognite.neat._graph.extractors import BaseExtractor
from cognite.neat._graph.loaders import BaseLoader
from cognite.neat._graph.loaders._base import _END_OF_CLASS, _START_OF_CLASS
from cognite.neat._graph.transformers import BaseTransformer
from cognite.neat._issues import NeatIssue
from cognite.neat._rules._shared import ReadRules
from cognite.neat._rules.exporters import BaseExporter
from cognite.neat._rules.importers import BaseImporter
from cognite.neat._rules.models import DMSInputRules, InformationInputRules, InformationRules
from cognite.neat._rules.models.dms import DMSInputContainer, DMSInputMetadata, DMSInputProperty, DMSInputView
from cognite.neat._rules.models.information import (
InformationInputClass,
InformationInputMetadata,
InformationInputProperty,
)
from cognite.neat._rules.transformers import RulesTransformer
from cognite.neat._shared import Triple
from cognite.neat._state import Action, NeatStateManager


class DummyInfoImporter(BaseImporter):
def to_rules(self) -> ReadRules[InformationInputRules]:
return ReadRules(
rules=InformationInputRules(
metadata=InformationInputMetadata("my_space", "MySpace", "v1", "doctrino"),
properties=[InformationInputProperty("Thing", "name", "text")],
classes=[InformationInputClass("Thing")],
),
read_context={},
)


class DummyDMSImporter(BaseImporter):
def to_rules(self) -> ReadRules[DMSInputRules]:
return ReadRules(
rules=DMSInputRules(
metadata=DMSInputMetadata("my_space", "MySpace", "v1", "doctrino"),
properties=[DMSInputProperty("Thing", "name", "text", container="Thing", container_property="name")],
views=[DMSInputView("Thing")],
containers=[DMSInputContainer("Thing")],
),
read_context={},
)


class NoOptExporter(BaseExporter[InformationRules, str]):
def export_to_file(self, rules: InformationRules, filepath: Path) -> None:
return None

def export(self, rules: InformationRules) -> str:
return ""


class NoOptExtractor(BaseExtractor):
def extract(self) -> Iterable[Triple]:
return []


class NoOptTransformer(BaseTransformer):
def transform(self, graph: Graph) -> None:
return None


class NoOptRulesTransformer(RulesTransformer[InformationRules, InformationInputRules]):
def transform(self, rules: InformationRules) -> InformationRules:
return rules


class NoOptLoader(BaseLoader[str]):
def _load(
self, stop_on_exception: bool = False
) -> Iterable[str | NeatIssue | type[_END_OF_CLASS] | _START_OF_CLASS]:
yield ""
return

def write_to_file(self, filepath: Path) -> None:
return None


@pytest.fixture(scope="function")
def empty_state() -> NeatStateManager:
"""Fixture for creating an empty NeatState instance."""
return NeatStateManager()


class TestNeatState:
@pytest.mark.parametrize(
"actions, expected_state",
[
pytest.param([DummyInfoImporter()], "Conceptual", id="Import information rules"),
pytest.param([NoOptExtractor()], "Instances", id="Extract instances"),
],
)
def test_valid_change(self, actions: list[Action], expected_state: str, empty_state: NeatStateManager) -> None:
for action in actions:
_ = empty_state.change(action)

assert empty_state.status == expected_state, "State did not change as expected."
Loading