Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset usage tracking and purging of expired temporary datasets #503

Merged
merged 13 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 149 additions & 50 deletions python/lib/core/dmod/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .enum import PydanticEnum
from typing import Any, Callable, ClassVar, Dict, FrozenSet, List, Optional, Set, Tuple, Type, Union
from pydantic import Field, validator, root_validator, PrivateAttr
from pydantic.fields import ModelField
from uuid import UUID, uuid4


Expand Down Expand Up @@ -135,8 +136,8 @@ def parse_dates(cls, v):
return datetime.strptime(v, cls.get_datetime_str_format())

@validator("created_on", "last_updated", "expires")
def drop_microseconds(cls, v: datetime):
return v.replace(microsecond=0)
def drop_microseconds(cls, v: datetime, field: ModelField):
return v.replace(microsecond=0) if (field.required or v is not None) else v

@validator("dataset_type")
def set_default_dataset_type(cls, value: Union[str, DatasetType] = None) -> DatasetType:
Expand Down Expand Up @@ -362,6 +363,15 @@ class DatasetUser(ABC):
This provides a more direct designation of something using one or more datasets.
"""

def __init__(self, datasets_and_managers: Optional[Dict[str, 'DatasetManager']] = None, *args, **kwargs):
self._datasets_and_managers = datasets_and_managers if datasets_and_managers else dict()

def __eq__(self, other) -> bool:
return self.uuid == other.uuid

def __hash__(self) -> int:
return self.uuid.__hash__()

@property
@abstractmethod
def uuid(self) -> UUID:
Expand All @@ -376,17 +386,16 @@ def uuid(self) -> UUID:
pass

@property
@abstractmethod
def datasets_in_use(self) -> Dict[UUID, str]:
def datasets_and_managers(self) -> Dict[str, 'DatasetManager']:
"""
A collection of datasets in used, keyed by UUID, with values being the dataset names.
A collection of associated managers of in-use datasets, key by the unique name of the in-use dataset.

Returns
-------
Dict[UUID, str]
A collection of datasets in used, keyed by UUID, with values being the dataset names.
Dict[str, 'DatasetManager']
A collection of associated managers of in-use datasets, key by the unique name of the in-use dataset.
"""
pass
return self._datasets_and_managers

def link_to_dataset(self, dataset: Dataset) -> bool:
"""
Expand All @@ -408,25 +417,7 @@ def link_to_dataset(self, dataset: Dataset) -> bool:
bool
Whether establishing the link was successful.
"""
if dataset.manager is not None and dataset.manager.link_user(user=self, dataset=dataset):
self.datasets_in_use[dataset.uuid] = dataset.name
self.linked_dataset_managers[dataset.uuid] = dataset.manager
return True
else:
return False

@property
@abstractmethod
def linked_dataset_managers(self) -> Dict[UUID, 'DatasetManager']:
"""
A collection of associated managers of in-use datasets, key by UUID of the in-use dataset.

Returns
-------
Dict[UUID, 'DatasetManager']
A collection of associated managers of in-use datasets, key by UUID of the in-use dataset.
"""
pass
return dataset.manager is not None and dataset.manager.link_user(user=self, dataset=dataset)

def unlink_to_dataset(self, dataset: Dataset) -> bool:
"""
Expand All @@ -435,21 +426,14 @@ def unlink_to_dataset(self, dataset: Dataset) -> bool:
Parameters
----------
dataset : Dataset
The used dataset.
The unlinked dataset.

Returns
-------
bool
Whether an established usage link was successful released.
Whether an established usage link was successfully released.
"""
if dataset.uuid not in self.datasets_in_use or dataset.uuid not in self.linked_dataset_managers:
return False
elif self.linked_dataset_managers[dataset.uuid].unlink_user(user=self, dataset=dataset):
self.datasets_in_use.pop(dataset.uuid)
self.linked_dataset_managers.pop(dataset.uuid)
return True
else:
return False
return dataset.manager is not None and dataset.manager.unlink_user(user=self, dataset=dataset)


class DatasetManager(ABC):
Expand Down Expand Up @@ -487,8 +471,10 @@ def get_serial_dataset_filename(cls, dataset_name: str) -> str:
def __init__(self, uuid: Optional[UUID] = None, datasets: Optional[Dict[str, Dataset]] = None, *args, **kwargs):
self._uuid = uuid4() if uuid is None else uuid
self._datasets = datasets if datasets is not None else dict()
self._dataset_users: Dict[str, Set[UUID]] = dict()
self._dataset_usage: Dict[str, Set[UUID]] = dict()
""" Collection of dataset names each keyed to a set of UUIDs of each user using the corresponding dataset. """
self._dataset_users: Dict[UUID, DatasetUser] = dict()
""" All linked dataset users, keyed by each user's UUID. """
self._errors = []
""" A property attribute to hold errors encountered during operations. """

Expand Down Expand Up @@ -568,6 +554,35 @@ def create(self, name: str, category: DataCategory, domain: DataDomain, is_read_
"""
pass

def create_temporary(self, expires_on: Optional[datetime] = None, **kwargs) -> Dataset:
"""
Convenience method that always creates a temporary dataset, and by default will have it expire after 1 day.

This method essentially just returns a nested call to ::method:`create`. Its purpose is simply to make sure
that a non-``None`` argument is passed for the ``expires_on`` param. It will provide a value for this param on
its own of ``datetime.now() + timedelta(days=1)``. It also can accept a specified ``expires_on`` and simply
pass that through.

Parameters
----------
expires_on: Optional[datetime]
Optional explicit expire time.
kwargs
Other keyword args passed in nested call to ::method:`create`.

Returns
-------
Dataset
A newly created temporary dataset instance ready for use.

See Also
-------
create
"""
if expires_on is None:
expires_on = datetime.now() + timedelta(days=1)
return self.create(expires_on=expires_on, **kwargs)
aaraney marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
def delete(self, dataset: Dataset, **kwargs) -> bool:
"""
Expand Down Expand Up @@ -671,6 +686,41 @@ def filter(self, base_dataset: Dataset, restrictions: List[Union[ContinuousRestr
"""
pass

def get_dataset_user(self, uuid: UUID) -> DatasetUser:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to return Optional[DatasetUser] here instead of throwing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent for the function is that it will only receive a user UUID value corresponding to a user instance that the manager is linked with (perhaps the caller even received the UUID from the manager; that's done in the ServiceManager in one spot). If some other UUID is given, it's fundamentally a problem.

I'd also expect real-world calls without a proper corresponding user object to generally be bug related: e.g., the manager failed to completely track a user properly when it was linked. An immediate error made more sense to me there.

"""
Get the linked ::class:`DatasetUser` with the given uuid.

Parameters
----------
uuid : UUID
The ::class:`DatasetUser.uuid` of the :class:`DatasetUser` of interest.

Returns
-------
DatasetUser
The linked ::class:`DatasetUser` with the given uuid.

Raises
-------
ValueError
If there is no linked user with the given UUID.
"""
try:
return self._dataset_users[uuid]
except KeyError as e:
raise ValueError(f"Manager {self.uuid!s} does not have linked dataset user {uuid!s}.")

def get_dataset_user_ids(self) -> FrozenSet[UUID]:
"""
Get an immutable set of UUIDs for all ::class:`DatasetUser` linked to this manager.

Returns
-------
FrozenSet[UUID]
Get an immutable set of UUIDs for all ::class:`DatasetUser` linked to this manager.
"""
return frozenset(du_id for du_id in self._dataset_users)

@abstractmethod
def get_data(self, dataset_name: str, item_name: str, **kwargs) -> Union[bytes, Any]:
"""
Expand All @@ -695,9 +745,9 @@ def get_data(self, dataset_name: str, item_name: str, **kwargs) -> Union[bytes,
"""
pass

def get_dataset_users(self, dataset_name: str) -> FrozenSet[UUID]:
def get_user_ids_for_dataset(self, dataset_name: str) -> FrozenSet[UUID]:
"""
Get an immutable set of UUIDs for the linked users of a dataset.
Get an immutable set of UUIDs for the linked ::class:`DatasetUser` of a dataset.

Parameters
----------
Expand All @@ -707,9 +757,47 @@ def get_dataset_users(self, dataset_name: str) -> FrozenSet[UUID]:
Returns
-------
FrozenSet[UUID]
Immutable set of UUIDs for the linked users of a dataset.
Immutable set of UUIDs for the linked ::class:`DatasetUser` of a dataset.
"""
return frozenset(self._dataset_usage[dataset_name]) if dataset_name in self._dataset_usage else frozenset()

def is_managed_dataset(self, dataset: Dataset) -> bool:
"""
return frozenset(self._dataset_users[dataset_name]) if dataset_name in self._dataset_users else frozenset()
Test whether the given dataset is managed by this manager, setting an unset manager reference if UUIDs match.

The method returns ``True`` if and only if the following two conditions are true **at the end of execution of
this method** (see special case below):
- the given dataset's name is a key within the ::attribute:`datasets` property
- the given dataset's ::attribute:`Dataset.manager` property is this instance

Additionally, if and only if all the following are true, then the method will set ::attribute:`Dataset.manager`
for the dataset to this instance, resulting in the aforementioned conditions for the method returning ``True``:
- the dataset's name is a key within the ::attribute:`datasets` property
- the dataset's ::attribute:`Dataset.manager` property is ``None``
- the dataset's ::attribute:`Dataset.manager_uuid` property is equal to the UUID of this instance

Parameters
----------
dataset : Dataset
The given dataset of interest.

Returns
-------
bool
``True`` iff both of the follow are true at the end of the methods executions (subject to documented side
effects):
- the dataset's name is a key within the ::attribute:`datasets` property
- the dataset's ::attribute:`Dataset.manager` property is this instance

"""
# TODO: (later) ensure any dataset created, reloaded, or transformed/derived passes this method in all impls
if dataset.name not in self.datasets:
return False

if dataset.manager is None and self.uuid == dataset.manager_uuid:
dataset.manager = self

return

def link_user(self, user: DatasetUser, dataset: Dataset) -> bool:
"""
Expand All @@ -728,10 +816,12 @@ def link_user(self, user: DatasetUser, dataset: Dataset) -> bool:
Whether the link was successful.
"""
if dataset.name not in self.datasets:
raise RuntimeError("Cannot link user {} to unknown dataset {}".format(user.uuid, dataset.name))
if dataset.name not in self._dataset_users:
self._dataset_users[dataset.name] = set()
self._dataset_users[dataset.name].add(user.uuid)
raise RuntimeError(f"Cannot link user {user.uuid!s} to unknown dataset {dataset.name}")
aaraney marked this conversation as resolved.
Show resolved Hide resolved
if dataset.name not in self._dataset_usage:
self._dataset_usage[dataset.name] = set()
self._dataset_usage[dataset.name].add(user.uuid)
self._dataset_users[user.uuid] = user
user.datasets_and_managers[dataset.name] = self
return True

@abstractmethod
Expand Down Expand Up @@ -843,12 +933,21 @@ def unlink_user(self, user: DatasetUser, dataset: Dataset) -> bool:
bool
Whether the usage link was successfully unlinked.
"""
if dataset.name not in self._dataset_users or user.uuid not in self._dataset_users[dataset.name]:
if dataset.name not in user.datasets_and_managers:
return False
if dataset.name not in self._dataset_usage or user.uuid not in self._dataset_usage[dataset.name]:
return False
elif len(self._dataset_users[dataset.name]) == 1:
self._dataset_users.pop(dataset.name)

if len(self._dataset_usage[dataset.name]) == 1:
self._dataset_usage.pop(dataset.name)
else:
self._dataset_users[dataset.name].remove(user.uuid)
self._dataset_usage[dataset.name].remove(user.uuid)

if not any([user.uuid in self._dataset_usage[ds_name] for ds_name in self._dataset_usage]):
self._dataset_users.pop(user.uuid)
aaraney marked this conversation as resolved.
Show resolved Hide resolved

user.datasets_and_managers.pop(dataset.name)

return True

@property
Expand Down
Loading
Loading