From 17affeccbf469504b6e1968efaf2ef0cee54c64b Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 16 Sep 2024 22:20:15 +0200 Subject: [PATCH] WIP --- ingestify/application/dataset_store.py | 16 +- ingestify/application/loader.py | 123 ++++++++------ ingestify/domain/models/dataset/identifier.py | 39 ++--- ingestify/domain/models/fetch_policy.py | 30 ++-- .../models/resources/dataset_resource.py | 49 ++++-- ingestify/domain/models/source.py | 9 - ingestify/exceptions.py | 4 + .../store/dataset/sqlalchemy/repository.py | 2 - ingestify/tests/test_engine.py | 154 ++++++++++-------- 9 files changed, 247 insertions(+), 179 deletions(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index f7a5a68..e2032aa 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -9,6 +9,7 @@ from typing import Dict, List, Optional, Union, Callable, BinaryIO +from ingestify.domain.models.dataset.dataset import DatasetState from ingestify.domain.models.dataset.events import RevisionAdded, MetadataUpdated from ingestify.domain.models.dataset.file_collection import FileCollection from ingestify.domain.models.event import EventBus @@ -209,9 +210,9 @@ def update_dataset( ): """The add_revision will also save the dataset.""" metadata_changed = False - if dataset.update_from_identifier(dataset_identifier): - self.dataset_repository.save(bucket=self.bucket, dataset=dataset) - metadata_changed = True + # if dataset.update_from_identifier(dataset_identifier): + # self.dataset_repository.save(bucket=self.bucket, dataset=dataset) + # metadata_changed = True self.add_revision(dataset, files) @@ -229,6 +230,9 @@ def create_dataset( dataset_type: str, provider: str, dataset_identifier: Identifier, + name: str, + state: DatasetState, + metadata: dict, files: Dict[str, DraftFile], description: str = "Create", ): @@ -237,12 +241,12 @@ def create_dataset( dataset = Dataset( bucket=self.bucket, dataset_id=self.dataset_repository.next_identity(), - name=dataset_identifier.name, - state=dataset_identifier.state, + name=name, + state=state, identifier=dataset_identifier, dataset_type=dataset_type, provider=provider, - metadata=dataset_identifier.metadata, + metadata=metadata, created_at=now, updated_at=now, ) diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 0e9fe15..4131f3e 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -1,15 +1,19 @@ import itertools +import json import logging import platform from multiprocessing import set_start_method, cpu_count -from typing import List +from typing import List, Optional from ingestify.domain.models import Dataset, Identifier, Selector, Source, Task, TaskSet from ingestify.utils import map_in_pool, TaskExecutor, chunker from .dataset_store import DatasetStore +from .. import DatasetResource, retrieve_http +from ..domain import DraftFile from ..domain.models.data_spec_version_collection import DataSpecVersionCollection from ..domain.models.extract_job import ExtractJob +from ..domain.models.resources.dataset_resource import FileResource from ..exceptions import ConfigurationError if platform.system() == "Darwin": @@ -39,69 +43,92 @@ def to_batches(input_): return batches +def load_file( + file_resource: FileResource, dataset: Optional[Dataset] = None +) -> Optional[DraftFile]: + current_file = None + if dataset: + current_file = dataset.current_revision.modified_files_map.get( + file_resource.file_id + ) + + if file_resource.json_content: + return DraftFile.from_input( + file_=json.dumps(file_resource.json_content, indent=4), + data_serialization_format="json", + data_feed_key=file_resource.data_feed_key, + data_spec_version=file_resource.data_spec_version, + modified_at=file_resource.last_modified, + ) + elif file_resource.url: + http_options = {} + if file_resource.http_options: + for k, v in file_resource.http_options.items(): + http_options[f"http_{k}"] = v + + return retrieve_http( + url=file_resource.url, + current_file=current_file, + file_data_feed_key=file_resource.data_feed_key, + file_data_spec_version=file_resource.data_spec_version, + file_data_serialization_format=file_resource.data_serialization_format + or "txt", + **http_options, + ) + else: + return file_resource.file_loader(file_resource, current_file) + + class UpdateDatasetTask(Task): def __init__( self, - source: Source, dataset: Dataset, - dataset_identifier: Identifier, - data_spec_versions: DataSpecVersionCollection, + dataset_resource: DatasetResource, store: DatasetStore, ): - self.source = source self.dataset = dataset - self.dataset_identifier = dataset_identifier - self.data_spec_versions = data_spec_versions + self.dataset_resource = dataset_resource self.store = store def run(self): - files = self.source.fetch_dataset_files( - self.dataset.dataset_type, - self.dataset_identifier, # Use the new dataset_identifier as it's more up-to-date, and contains more info - data_spec_versions=self.data_spec_versions, - current_revision=self.dataset.current_revision, - ) self.store.update_dataset( dataset=self.dataset, - dataset_identifier=self.dataset_identifier, - files=files, + dataset_identifier=Identifier(self.dataset_resource.dataset_resource_id), + files={ + file_id: load_file(file_resource, dataset=self.dataset) + for file_id, file_resource in self.dataset_resource.files.items() + }, ) def __repr__(self): - return f"UpdateDatasetTask({self.source} -> {self.dataset.identifier})" + return f"UpdateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})" class CreateDatasetTask(Task): def __init__( self, - source: Source, - dataset_type: str, - data_spec_versions: DataSpecVersionCollection, - dataset_identifier: Identifier, + dataset_resource: DatasetResource, store: DatasetStore, ): - self.source = source - self.dataset_type = dataset_type - self.data_spec_versions = data_spec_versions - self.dataset_identifier = dataset_identifier + self.dataset_resource = dataset_resource self.store = store def run(self): - files = self.source.fetch_dataset_files( - dataset_type=self.dataset_type, - identifier=self.dataset_identifier, - data_spec_versions=self.data_spec_versions, - current_revision=None, - ) self.store.create_dataset( - dataset_type=self.dataset_type, - provider=self.source.provider, - dataset_identifier=self.dataset_identifier, - files=files, + dataset_type=self.dataset_resource.dataset_type, + provider=self.dataset_resource.provider, + dataset_identifier=Identifier(**self.dataset_resource.dataset_resource_id), + name=self.dataset_resource.name, + state=self.dataset_resource.state, + metadata=self.dataset_resource.metadata, + files={ + file_id: load_file(file_resource) + for file_id, file_resource in self.dataset_resource.files.items() + }, ) def __repr__(self): - return f"CreateDatasetTask({self.source} -> {self.dataset_identifier})" + return f"CreateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})" class Loader: @@ -204,7 +231,9 @@ def run_task(task): for batch in batches: dataset_identifiers = [ - Identifier.create_from(selector, **dataset_resource.dataset_resource_id) + Identifier.create_from_selector( + selector, **dataset_resource.dataset_resource_id + ) # We have to pass the data_spec_versions here as a Source can add some # extra data to the identifier which is retrieved in a certain data format for dataset_resource in batch @@ -213,7 +242,8 @@ def run_task(task): # Load all available datasets based on the discovered dataset identifiers dataset_collection = self.store.get_dataset_collection( dataset_type=extract_job.dataset_type, - provider=extract_job.source.provider, + # Assume all DatasetResources share the same provider + provider=batch[0].provider, selector=dataset_identifiers, ) @@ -221,30 +251,29 @@ def run_task(task): total_dataset_count += len(dataset_identifiers) task_set = TaskSet() - for dataset_identifier in dataset_identifiers: + for dataset_resource in batch: + dataset_identifier = Identifier.create_from_selector( + selector, **dataset_resource.dataset_resource_id + ) + if dataset := dataset_collection.get(dataset_identifier): if extract_job.fetch_policy.should_refetch( - dataset, dataset_identifier + dataset, dataset_resource ): task_set.add( UpdateDatasetTask( - source=extract_job.source, dataset=dataset, # Current dataset from the database - dataset_identifier=dataset_identifier, # Most recent dataset_identifier - data_spec_versions=selector.data_spec_versions, + dataset_resource=dataset_resource, # Most recent dataset_resource store=self.store, ) ) else: skip_count += 1 else: - if extract_job.fetch_policy.should_fetch(dataset_identifier): + if extract_job.fetch_policy.should_fetch(dataset_resource): task_set.add( CreateDatasetTask( - source=extract_job.source, - dataset_type=extract_job.dataset_type, - dataset_identifier=dataset_identifier, - data_spec_versions=selector.data_spec_versions, + dataset_resource=dataset_resource, store=self.store, ) ) diff --git a/ingestify/domain/models/dataset/identifier.py b/ingestify/domain/models/dataset/identifier.py index 5789af0..750f05d 100644 --- a/ingestify/domain/models/dataset/identifier.py +++ b/ingestify/domain/models/dataset/identifier.py @@ -1,33 +1,24 @@ -from datetime import datetime -from typing import Optional, TYPE_CHECKING, Dict +from typing import TYPE_CHECKING -from ingestify.utils import AttributeBag +from ingestify.utils import key_from_dict if TYPE_CHECKING: - from ingestify.domain.models.dataset.dataset import DatasetState + from ingestify.domain import Selector -class Identifier(AttributeBag): - @property - def last_modified(self) -> Optional[datetime]: - return self.attributes.get("_last_modified") - - @property - def name(self) -> Optional[str]: - return self.attributes.get("_name") +class Identifier(dict): + @classmethod + def create_from_selector(cls, selector: "Selector", **kwargs): + identifier = cls(**selector.filtered_attributes) + identifier.update(kwargs) + return identifier @property - def metadata(self) -> dict: - return self.attributes.get("_metadata", {}) + def key(self): + return key_from_dict(self) - @property - def state(self) -> "DatasetState": - from ingestify.domain.models.dataset.dataset import DatasetState - - return self.attributes.get("_state", DatasetState.SCHEDULED) + def __hash__(self): + return hash(self.key) - @property - def files_last_modified(self) -> Optional[Dict[str, datetime]]: - """Return last modified per file. This makes it possible to detect when a file is added with an older - last_modified than current dataset.""" - return self.attributes.get("_files_last_modified") + def __str__(self): + return "/".join([f"{k}={v}" for k, v in self.items()]) diff --git a/ingestify/domain/models/fetch_policy.py b/ingestify/domain/models/fetch_policy.py index d0089b4..3856b8e 100644 --- a/ingestify/domain/models/fetch_policy.py +++ b/ingestify/domain/models/fetch_policy.py @@ -1,6 +1,6 @@ from datetime import timedelta -from ingestify.domain import Dataset, Identifier +from ingestify.domain import Dataset, Identifier, DatasetResource from ingestify.utils import utcnow @@ -10,25 +10,31 @@ def __init__(self): self.min_age = utcnow() - timedelta(days=2) self.last_change = utcnow() - timedelta(days=1) - def should_fetch(self, dataset_identifier: Identifier) -> bool: + def should_fetch(self, dataset_resource: DatasetResource) -> bool: # this is called when dataset does not exist yet return True - def should_refetch(self, dataset: Dataset, identifier: Identifier) -> bool: + def should_refetch( + self, dataset: Dataset, dataset_resource: DatasetResource + ) -> bool: current_revision = dataset.current_revision if not dataset.revisions: # TODO: this is weird? Dataset without any data. Fetch error? return True elif current_revision: - if identifier.files_last_modified: - if current_revision.is_changed(identifier.files_last_modified): - return True + files_last_modified = { + file.file_id: file.last_modified + for file in dataset_resource.files.values() + } + if current_revision.is_changed(files_last_modified): + return True - else: - if ( - identifier.last_modified - and current_revision.created_at < identifier.last_modified - ): - return True + # We don't set last_modified on Dataset level anymore, only on file level + # else: + # if ( + # identifier.last_modified + # and current_revision.created_at < identifier.last_modified + # ): + # return True return False diff --git a/ingestify/domain/models/resources/dataset_resource.py b/ingestify/domain/models/resources/dataset_resource.py index 124cc2e..b447b6e 100644 --- a/ingestify/domain/models/resources/dataset_resource.py +++ b/ingestify/domain/models/resources/dataset_resource.py @@ -9,6 +9,7 @@ @dataclass(frozen=True) class FileResource: + dataset_resource: "DatasetResource" file_id: str last_modified: datetime data_feed_key: str @@ -17,46 +18,70 @@ class FileResource: # DataSerializationFormat is "json" in case of json_content, otherwise file_loader will return it # data_serialization_format: str - json_content: Optional[str] = None + json_content: Optional[dict] = None + + url: Optional[str] = None + http_options: Optional[dict] = None + data_serialization_format: Optional[str] = None + file_loader: Optional[ - Callable[ - ["DatasetResource", "FileResource", Optional["File"]], Optional["DraftFile"] - ] + Callable[["FileResource", Optional["File"]], Optional["DraftFile"]] ] = None class DatasetResource: def __init__( - self, dataset_resource_id: dict, name: str, metadata: dict, state: "DatasetState" + self, + dataset_resource_id: dict, + /, + dataset_type: str, + provider: str, + name: str, + metadata: Optional[dict] = None, + state: Optional["DatasetState"] = None, ): + from ingestify.domain.models.dataset.dataset import DatasetState + + self.dataset_type = dataset_type + self.provider = provider self.dataset_resource_id = dataset_resource_id self.name = name - self.metadata = metadata - self.state = state + self.metadata = metadata or {} + self.state = state or DatasetState.COMPLETE - self._files = {} + self.files = {} - def add_file_resource( + def add_file( self, last_modified: datetime, data_feed_key: str, data_spec_version: str, - json_content: Optional[str] = None, + json_content: Optional[dict] = None, + url: Optional[str] = None, + http_options: Optional[dict] = None, + data_serialization_format: Optional[str] = None, file_loader: Optional[ Callable[ - ["DatasetResource", "FileResource", Optional["File"]], Optional["DraftFile"] + ["FileResource", Optional["File"]], + Optional["DraftFile"], ] ] = None, ): file_id = f"{data_feed_key}__{data_spec_version}" + if file_id in self.files: + raise DuplicateFile(f"File with id {file_id} already exists.") file_resource = FileResource( + dataset_resource=self, file_id=file_id, data_feed_key=data_feed_key, data_spec_version=data_spec_version, last_modified=last_modified, json_content=json_content, + url=url, + http_options=http_options, + data_serialization_format=data_serialization_format, file_loader=file_loader, ) - self._files[file_id] = file_resource + self.files[file_id] = file_resource diff --git a/ingestify/domain/models/source.py b/ingestify/domain/models/source.py index 15588f5..3dd6f6c 100644 --- a/ingestify/domain/models/source.py +++ b/ingestify/domain/models/source.py @@ -1,11 +1,7 @@ from abc import ABC, abstractmethod from typing import Dict, List, Optional, Iterable, Iterator, Union -# from ingestify.utils import ComponentFactory, ComponentRegistry - -from . import DraftFile from .data_spec_version_collection import DataSpecVersionCollection -from .dataset import Identifier, Revision from .dataset.collection_metadata import DatasetCollectionMetadata from .resources.dataset_resource import DatasetResource @@ -14,11 +10,6 @@ class Source(ABC): def __init__(self, name: str, **kwargs): self.name = name - @property - @abstractmethod - def provider(self) -> str: - pass - # TODO: consider making this required... # @abstractmethod # def discover_selectors(self, dataset_type: str) -> List[Dict]: diff --git a/ingestify/exceptions.py b/ingestify/exceptions.py index 23664bd..7a9e314 100644 --- a/ingestify/exceptions.py +++ b/ingestify/exceptions.py @@ -4,3 +4,7 @@ class IngestifyError(Exception): class ConfigurationError(IngestifyError): pass + + +class DuplicateFile(IngestifyError): + pass diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index cd35a0a..45ee720 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -30,8 +30,6 @@ def parse_value(v): def json_serializer(o): - if isinstance(o, Identifier): - o = o.filtered_attributes return json.dumps(o) diff --git a/ingestify/tests/test_engine.py b/ingestify/tests/test_engine.py index 6c3855b..e2d103e 100644 --- a/ingestify/tests/test_engine.py +++ b/ingestify/tests/test_engine.py @@ -3,7 +3,7 @@ import pytz -from ingestify import Source +from ingestify import Source, DatasetResource from ingestify.application.ingestion_engine import IngestionEngine from ingestify.domain import ( Identifier, @@ -12,6 +12,9 @@ DraftFile, Revision, ) +from ingestify.domain.models.dataset.collection_metadata import ( + DatasetCollectionMetadata, +) from ingestify.domain.models.extract_job import ExtractJob from ingestify.domain.models.fetch_policy import FetchPolicy from ingestify.main import get_engine @@ -31,49 +34,72 @@ def add_extract_job(engine: IngestionEngine, source: Source, **selector): ) -class SimpleFakeSource(Source): - @property - def provider(self) -> str: - return "fake" +def file_loader(file_resource, current_file): + if file_resource.file_id == "file1__v1": + if not current_file: + return DraftFile.from_input( + "content1", + ) + else: + return DraftFile.from_input( + "different_content", + ) - def discover_datasets( + elif file_resource.file_id == "file2__v1": + return DraftFile.from_input( + "some_content" + str(file_resource.dataset_resource.dataset_resource_id) + ) + + +class SimpleFakeSource(Source): + def find_datasets( self, dataset_type: str, data_spec_versions: DataSpecVersionCollection, + dataset_collection_metadata: DatasetCollectionMetadata, competition_id, season_id, **kwargs ): - return [ + dataset_resource = DatasetResource( dict( competition_id=competition_id, season_id=season_id, - _name="Test Dataset", - _last_modified=datetime.now(pytz.utc), - ) - ] + ), + provider="fake", + dataset_type="match", + name="Test Dataset", + ) - def fetch_dataset_files( - self, - dataset_type: str, - identifier: Identifier, - data_spec_versions: DataSpecVersionCollection, - current_revision: Optional[Revision], - ): - if current_revision: - return { - "file1": DraftFile.from_input( - "different_content", - ), - "file2": DraftFile.from_input("some_content" + identifier.key), - } - else: - return { - "file1": DraftFile.from_input( - "content1", - ), - "file2": DraftFile.from_input("some_content" + identifier.key), - } + last_modified = datetime.now(pytz.utc) + + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file1", + data_spec_version="v1", + file_loader=file_loader, + ) + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file2", + data_spec_version="v1", + file_loader=file_loader, + ) + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file3", + data_spec_version="v1", + json_content={"test": "some-content"}, + ) + # dataset_resource.add_file( + # last_modified=last_modified, + # data_feed_key="file4", + # data_spec_version="v1", + # url="https://raw.githubusercontent.com/statsbomb/open-data/refs/heads/master/data/three-sixty/3788741.json", + # data_serialization_format="json" + # ) + + yield dataset_resource class BatchSource(Source): @@ -83,14 +109,11 @@ def __init__(self, name, callback): self.should_stop = False self.idx = 0 - @property - def provider(self) -> str: - return "fake" - - def discover_datasets( + def find_datasets( self, dataset_type: str, data_spec_versions: DataSpecVersionCollection, + dataset_collection_metadata: DatasetCollectionMetadata, competition_id, season_id, **kwargs @@ -100,39 +123,36 @@ def discover_datasets( for i in range(10): match_id = self.idx self.idx += 1 - item = dict( - competition_id=competition_id, - season_id=season_id, - match_id=match_id, - _name="Test Dataset", - _last_modified=datetime.now(pytz.utc), + dataset_resource = DatasetResource( + dict( + competition_id=competition_id, + season_id=season_id, + match_id=match_id, + ), + name="Test dataset", + provider="fake", + dataset_type="match", + ) + + last_modified = datetime.now(pytz.utc) + + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file1", + data_spec_version="v1", + file_loader=file_loader, ) - items.append(item) + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file2", + data_spec_version="v1", + file_loader=file_loader, + ) + + items.append(dataset_resource) yield items self.callback and self.callback(self.idx) - def fetch_dataset_files( - self, - dataset_type: str, - identifier: Identifier, - data_spec_versions: DataSpecVersionCollection, - current_revision: Optional[Revision], - ): - if current_revision: - return { - "file1": DraftFile.from_input( - "different_content", - ), - "file2": DraftFile.from_input("some_content" + identifier.key), - } - else: - return { - "file1": DraftFile.from_input( - "content1", - ), - "file2": DraftFile.from_input("some_content" + identifier.key), - } - def test_engine(config_file): engine = get_engine(config_file, "main") @@ -155,7 +175,7 @@ def test_engine(config_file): dataset = datasets.first() assert dataset.identifier == Identifier(competition_id=1, season_id=2) assert len(dataset.revisions) == 2 - assert len(dataset.revisions[0].modified_files) == 2 + assert len(dataset.revisions[0].modified_files) == 3 assert len(dataset.revisions[1].modified_files) == 1 add_extract_job(