Skip to content

Commit

Permalink
CLean parameters for the datanode track_edit method. (#2240)
Browse files Browse the repository at this point in the history
* CLean parameters for the datanode track_edit method.

* Add unit tests and autoset the edits.

* Update taipy/core/data/csv.py

* Fix kwargs typing

* Remove type ignore
  • Loading branch information
jrobinAV authored Nov 14, 2024
1 parent 63d508e commit 4936044
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 52 deletions.
7 changes: 3 additions & 4 deletions taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from .._entity._reload import _Reloader
from .._version._version_manager_factory import _VersionManagerFactory
from ..job.job_id import JobId
from ._file_datanode_mixin import _FileDataNodeMixin
from ._tabular_datanode_mixin import _TabularDataNodeMixin
from .data_node import DataNode
Expand Down Expand Up @@ -116,16 +115,16 @@ def __init__(
def storage_type(cls) -> str:
return cls.__STORAGE_TYPE

def write_with_column_names(self, data: Any, columns: Optional[List[str]] = None, job_id: Optional[JobId] = None):
def write_with_column_names(self, data: Any, columns: Optional[List[str]] = None, editor_id: Optional[str] = None):
"""Write a selection of columns.
Arguments:
data (Any): The data to write.
columns (Optional[List[str]]): The list of column names to write.
job_id (JobId): An optional identifier of the writer.
editor_id (str): An optional identifier of the writer.
"""
self._write(data, columns)
self.track_edit(timestamp=datetime.now(), job_id=job_id)
self.track_edit(editor_id=editor_id, timestamp=datetime.now())

def _read(self):
return self._read_from_path()
Expand Down
55 changes: 38 additions & 17 deletions taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import functools
import os
import typing
import uuid
from abc import abstractmethod
from datetime import datetime, timedelta
Expand All @@ -33,7 +34,7 @@
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from ..reason import DataNodeEditInProgress, DataNodeIsNotWritten
from ._filter import _FilterDataNode
from .data_node_id import DataNodeId, Edit
from .data_node_id import EDIT_COMMENT_KEY, EDIT_EDITOR_ID_KEY, EDIT_JOB_ID_KEY, EDIT_TIMESTAMP_KEY, DataNodeId, Edit
from .operator import JoinOperator


Expand Down Expand Up @@ -200,6 +201,11 @@ def edits(self) -> List[Edit]:
"""
return self._edits

@edits.setter # type: ignore
@_self_setter(_MANAGER_NAME)
def edits(self, val):
self._edits = val

@property # type: ignore
@_self_reload(_MANAGER_NAME)
def last_edit_date(self) -> Optional[datetime]:
Expand Down Expand Up @@ -415,29 +421,29 @@ def read(self) -> Any:
)
return None

def append(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
def append(self, data, editor_id: Optional[str] = None, **kwargs: Any):
"""Append some data to this data node.
Arguments:
data (Any): The data to write to this data node.
job_id (JobId): An optional identifier of the writer.
**kwargs (dict[str, any]): Extra information to attach to the edit document
editor_id (str): An optional identifier of the editor.
**kwargs (Any): Extra information to attach to the edit document
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory

self._append(data)
self.track_edit(job_id=job_id, **kwargs)
self.track_edit(editor_id=editor_id, **kwargs)
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
def write(self, data, job_id: Optional[JobId] = None, **kwargs: Any):
"""Write some data to this data node.
Arguments:
data (Any): The data to write to this data node.
job_id (JobId): An optional identifier of the writer.
**kwargs (dict[str, any]): Extra information to attach to the edit document
job_id (JobId): An optional identifier of the job writing the data.
**kwargs (Any): Extra information to attach to the edit document
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory
Expand All @@ -447,20 +453,35 @@ def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def track_edit(self, **options):
def track_edit(self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any):
"""Creates and adds a new entry in the edits attribute without writing the data.
Arguments:
options (dict[str, any]): track `timestamp`, `comments`, `job_id`. The others are user-custom, users can
use options to attach any information to an external edit of a data node.
job_id (Optional[str]): The optional identifier of the job writing the data.
editor_id (Optional[str]): The optional identifier of the editor writing the data.
timestamp (Optional[datetime]): The optional timestamp of the edit. If not provided, the
current time is used.
comment (Optional[str]): The optional comment of the edit.
**options (Any): User-custom attributes to attach to the edit.
"""
edit = {k: v for k, v in options.items() if v is not None}
if "timestamp" not in edit:
edit["timestamp"] = (
self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None)) or datetime.now()
)
self.last_edit_date = edit.get("timestamp")
self._edits.append(edit)
if job_id:
edit[EDIT_JOB_ID_KEY] = job_id
if editor_id:
edit[EDIT_EDITOR_ID_KEY] = editor_id
if comment:
edit[EDIT_COMMENT_KEY] = comment
if not timestamp:
timestamp = self._get_last_modified_datetime(self._properties.get(self._PATH_KEY)) or datetime.now()
edit[EDIT_TIMESTAMP_KEY] = timestamp
self.last_edit_date = edit.get(EDIT_TIMESTAMP_KEY)
self._edits.append(typing.cast(Edit, edit))
self.edits = self._edits

def lock_edit(self, editor_id: Optional[str] = None):
"""Lock the data node modification.
Expand Down
4 changes: 4 additions & 0 deletions taipy/core/data/data_node_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@
Edit = NewType("Edit", Dict[str, Any])
"""Type that holds a `DataNode^` edit information."""
Edit.__doc__ = """Type that holds a `DataNode^` edit information."""
EDIT_TIMESTAMP_KEY = "timestamp"
EDIT_JOB_ID_KEY = "job_id"
EDIT_COMMENT_KEY = "comment"
EDIT_EDITOR_ID_KEY = "editor_id"
7 changes: 3 additions & 4 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from .._entity._reload import _Reloader
from .._version._version_manager_factory import _VersionManagerFactory
from ..exceptions.exceptions import ExposedTypeLengthMismatch, NonExistingExcelSheet, SheetNameLengthMismatch
from ..job.job_id import JobId
from ._file_datanode_mixin import _FileDataNodeMixin
from ._tabular_datanode_mixin import _TabularDataNodeMixin
from .data_node import DataNode
Expand Down Expand Up @@ -119,13 +118,13 @@ def storage_type(cls) -> str:
"""Return the storage type of the data node: "excel"."""
return cls.__STORAGE_TYPE

def write_with_column_names(self, data: Any, columns: List[str] = None, job_id: Optional[JobId] = None) -> None:
def write_with_column_names(self, data: Any, columns: List[str] = None, editor_id: Optional[str] = None) -> None:
"""Write a set of columns.
Arguments:
data (Any): The data to write.
columns (List[str]): The list of column names to write.
job_id (Optional[JobId]): An optional identifier of the writer.
editor_id (Optional[str]): An optional identifier of the writer.
"""
if isinstance(data, Dict) and all(isinstance(x, (pd.DataFrame, np.ndarray)) for x in data.values()):
self._write_excel_with_multiple_sheets(data, columns=columns)
Expand All @@ -134,7 +133,7 @@ def write_with_column_names(self, data: Any, columns: List[str] = None, job_id:
if columns:
df = self._set_column_if_dataframe(df, columns)
self._write_excel_with_single_sheet(df.to_excel, self.path, index=False)
self.track_edit(timestamp=datetime.now(), job_id=job_id)
self.track_edit(timestamp=datetime.now(), editor_id=editor_id)

@staticmethod
def _check_exposed_type(exposed_type):
Expand Down
7 changes: 3 additions & 4 deletions taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from .._entity._reload import _Reloader
from .._version._version_manager_factory import _VersionManagerFactory
from ..exceptions.exceptions import UnknownCompressionAlgorithm, UnknownParquetEngine
from ..job.job_id import JobId
from ._file_datanode_mixin import _FileDataNodeMixin
from ._tabular_datanode_mixin import _TabularDataNodeMixin
from .data_node import DataNode
Expand Down Expand Up @@ -163,14 +162,14 @@ def storage_type(cls) -> str:
"""Return the storage type of the data node: "parquet"."""
return cls.__STORAGE_TYPE

def _write_with_kwargs(self, data: Any, job_id: Optional[JobId] = None, **write_kwargs):
def _write_with_kwargs(self, data: Any, editor_id: Optional[str] = None, **write_kwargs):
"""Write the data referenced by this data node.
Keyword arguments here which are also present in the Data Node config will overwrite them.
Arguments:
data (Any): The data to write.
job_id (JobId): An optional identifier of the writer.
editor_id (str): An optional identifier of the writer.
**write_kwargs (dict[str, any]): The keyword arguments passed to the function
`pandas.DataFrame.to_parquet()`.
"""
Expand All @@ -189,7 +188,7 @@ def _write_with_kwargs(self, data: Any, job_id: Optional[JobId] = None, **write_
# Ensure that the columns are strings, otherwise writing will fail with pandas 1.3.5
df.columns = df.columns.astype(str)
df.to_parquet(self._path, **kwargs)
self.track_edit(timestamp=datetime.now(), job_id=job_id)
self.track_edit(timestamp=datetime.now(), editor_id=editor_id)

def read_with_kwargs(self, **read_kwargs):
"""Read data from this data node.
Expand Down
9 changes: 5 additions & 4 deletions taipy/gui_core/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from taipy.core import get as core_get
from taipy.core import submit as core_submit
from taipy.core.data._file_datanode_mixin import _FileDataNodeMixin
from taipy.core.data.data_node_id import EDIT_COMMENT_KEY, EDIT_EDITOR_ID_KEY, EDIT_JOB_ID_KEY, EDIT_TIMESTAMP_KEY
from taipy.core.notification import CoreEventConsumerBase, EventEntityType
from taipy.core.notification.event import Event, EventOperation
from taipy.core.notification.notifier import Notifier
Expand Down Expand Up @@ -993,7 +994,7 @@ def get_data_node_history(self, id: str):
if id and (dn := core_get(id)) and isinstance(dn, DataNode):
res = []
for e in dn.edits:
job_id = e.get("job_id")
job_id = e.get(EDIT_JOB_ID_KEY)
job: t.Optional[Job] = None
if job_id:
if not (reason := is_readable(job_id)):
Expand All @@ -1002,11 +1003,11 @@ def get_data_node_history(self, id: str):
job = core_get(job_id)
res.append(
(
e.get("timestamp"),
job_id if job_id else e.get("writer_identifier", ""),
e.get(EDIT_TIMESTAMP_KEY),
job_id if job_id else e.get(EDIT_EDITOR_ID_KEY, ""),
f"Execution of task {job.task.get_simple_label()}."
if job and job.task
else e.get("comment", ""),
else e.get(EDIT_COMMENT_KEY, ""),
)
)
return sorted(res, key=lambda r: r[0], reverse=True)
Expand Down
66 changes: 64 additions & 2 deletions tests/core/data/test_data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
from taipy.core.data._data_manager import _DataManager
from taipy.core.data._data_manager_factory import _DataManagerFactory
from taipy.core.data.data_node import DataNode
from taipy.core.data.data_node_id import DataNodeId
from taipy.core.data.data_node_id import (
EDIT_COMMENT_KEY,
EDIT_EDITOR_ID_KEY,
EDIT_JOB_ID_KEY,
EDIT_TIMESTAMP_KEY,
DataNodeId,
)
from taipy.core.data.in_memory import InMemoryDataNode
from taipy.core.exceptions.exceptions import DataNodeIsBeingEdited, NoData
from taipy.core.job.job_id import JobId
Expand Down Expand Up @@ -667,7 +673,7 @@ def test_path_populated_with_config_default_path(self):
data_node.path = "baz.p"
assert data_node.path == "baz.p"

def test_track_edit(self):
def test_edit_edit_tracking(self):
dn_config = Config.configure_data_node("A")
data_node = _DataManager._bulk_get_or_create([dn_config])[dn_config]

Expand Down Expand Up @@ -745,3 +751,59 @@ def test_change_data_node_name(self):
# This new syntax will be the only one allowed: https://github.com/Avaiga/taipy-core/issues/806
dn.properties["name"] = "baz"
assert dn.name == "baz"

def test_track_edit(self):
dn_config = Config.configure_data_node("A")
data_node = _DataManager._bulk_get_or_create([dn_config])[dn_config]

before = datetime.now()
data_node.track_edit(job_id="job_1")
data_node.track_edit(editor_id="editor_1")
data_node.track_edit(comment="This is a comment on this edit")
data_node.track_edit(editor_id="editor_2", comment="This is another comment on this edit")
data_node.track_edit(editor_id="editor_3", foo="bar")
after = datetime.now()
timestamp = datetime.now()
data_node.track_edit(timestamp=timestamp)
_DataManagerFactory._build_manager()._set(data_node)
# To save the edits because track edit does not save the data node

assert len(data_node.edits) == 6
assert data_node.edits[-1] == data_node.get_last_edit()
assert data_node.last_edit_date == data_node.get_last_edit().get(EDIT_TIMESTAMP_KEY)

edit_0 = data_node.edits[0]
assert len(edit_0) == 2
assert edit_0[EDIT_JOB_ID_KEY] == "job_1"
assert edit_0[EDIT_TIMESTAMP_KEY] >= before
assert edit_0[EDIT_TIMESTAMP_KEY] <= after

edit_1 = data_node.edits[1]
assert len(edit_1) == 2
assert edit_1[EDIT_EDITOR_ID_KEY] == "editor_1"
assert edit_1[EDIT_TIMESTAMP_KEY] >= before
assert edit_1[EDIT_TIMESTAMP_KEY] <= after

edit_2 = data_node.edits[2]
assert len(edit_2) == 2
assert edit_2[EDIT_COMMENT_KEY] == "This is a comment on this edit"
assert edit_2[EDIT_TIMESTAMP_KEY] >= before
assert edit_2[EDIT_TIMESTAMP_KEY] <= after

edit_3 = data_node.edits[3]
assert len(edit_3) == 3
assert edit_3[EDIT_EDITOR_ID_KEY] == "editor_2"
assert edit_3[EDIT_COMMENT_KEY] == "This is another comment on this edit"
assert edit_3[EDIT_TIMESTAMP_KEY] >= before
assert edit_3[EDIT_TIMESTAMP_KEY] <= after

edit_4 = data_node.edits[4]
assert len(edit_4) == 3
assert edit_4[EDIT_EDITOR_ID_KEY] == "editor_3"
assert edit_4["foo"] == "bar"
assert edit_4[EDIT_TIMESTAMP_KEY] >= before
assert edit_4[EDIT_TIMESTAMP_KEY] <= after

edit_5 = data_node.edits[5]
assert len(edit_5) == 1
assert edit_5[EDIT_TIMESTAMP_KEY] == timestamp
17 changes: 9 additions & 8 deletions tests/core/notification/test_events_published.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ def test_events_published_for_writing_dn():
all_evts = RecordingConsumer(register_id_0, register_queue_0)
all_evts.start()

# Write input manually trigger 4 data node update events
# for last_edit_date, editor_id, editor_expiration_date and edit_in_progress
# Write input manually trigger 5 data node update events
# for last_edit_date, editor_id, editor_expiration_date, edit_in_progress and edits
scenario.the_input.write("test")
snapshot = all_evts.capture()
assert len(snapshot.collected_events) == 4
assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 4
assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 4
assert len(snapshot.collected_events) == 5
assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 5
assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 5
all_evts.stop()


Expand All @@ -178,22 +178,23 @@ def test_events_published_for_scenario_submission():
# 1 submission update event for is_completed
scenario.submit()
snapshot = all_evts.capture()
assert len(snapshot.collected_events) == 17
assert len(snapshot.collected_events) == 18
assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 7
assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 8
assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 4
assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 5
assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 2
assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 14
assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 15
assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 1

assert snapshot.attr_name_collected["last_edit_date"] == 1
assert snapshot.attr_name_collected["editor_id"] == 2
assert snapshot.attr_name_collected["editor_expiration_date"] == 2
assert snapshot.attr_name_collected["edit_in_progress"] == 2
assert snapshot.attr_name_collected["edits"] == 1
assert snapshot.attr_name_collected["status"] == 3
assert snapshot.attr_name_collected["jobs"] == 1
assert snapshot.attr_name_collected["submission_status"] == 3
Expand Down
Loading

0 comments on commit 4936044

Please sign in to comment.