From d980dc5c707f39418a7e3bbf1d378139108cf330 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 16 Sep 2024 09:58:06 +0200 Subject: [PATCH 1/4] WIP --- ingestify/__init__.py | 2 +- ingestify/application/loader.py | 32 +++++++--- ingestify/domain/models/__init__.py | 2 + ingestify/domain/models/resources/__init__.py | 1 + .../models/resources/dataset_resource.py | 62 +++++++++++++++++++ ingestify/domain/models/source.py | 15 +---- ingestify/source_base.py | 2 + ingestify/utils.py | 9 +++ 8 files changed, 104 insertions(+), 21 deletions(-) create mode 100644 ingestify/domain/models/resources/__init__.py create mode 100644 ingestify/domain/models/resources/dataset_resource.py diff --git a/ingestify/__init__.py b/ingestify/__init__.py index 47a584b..b0ddbfc 100644 --- a/ingestify/__init__.py +++ b/ingestify/__init__.py @@ -6,6 +6,6 @@ if not __INGESTIFY_SETUP__: from .infra import retrieve_http - from .source_base import Source + from .source_base import Source, DatasetResource __version__ = "0.0.5" diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index c3532b1..0e9fe15 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -1,10 +1,11 @@ +import itertools import logging import platform from multiprocessing import set_start_method, cpu_count from typing import List from ingestify.domain.models import Dataset, Identifier, Selector, Source, Task, TaskSet -from ingestify.utils import map_in_pool, TaskExecutor +from ingestify.utils import map_in_pool, TaskExecutor, chunker from .dataset_store import DatasetStore from ..domain.models.data_spec_version_collection import DataSpecVersionCollection @@ -20,6 +21,24 @@ logger = logging.getLogger(__name__) +DEFAULT_CHUNK_SIZE = 100 + + +def to_batches(input_): + if isinstance(input_, list): + batches = [input_] + else: + # Assume it's an iterator. Peek what's inside, and put it back + peek = next(input_) + input_ = itertools.chain([peek], input_) + + if not isinstance(peek, list): + batches = chunker(input_, DEFAULT_CHUNK_SIZE) + else: + batches = input_ + return batches + + class UpdateDatasetTask(Task): def __init__( self, @@ -174,24 +193,21 @@ def run_task(task): # There are two different, but similar flows here: # 1. The discover_datasets returns a list, and the entire list can be processed at once # 2. The discover_datasets returns an iterator of batches, in this case we need to process each batch - discovered_datasets = extract_job.source.discover_datasets( + datasets = extract_job.source.find_datasets( dataset_type=extract_job.dataset_type, data_spec_versions=selector.data_spec_versions, dataset_collection_metadata=dataset_collection_metadata, **selector.filtered_attributes, ) - if isinstance(discovered_datasets, list): - batches = [discovered_datasets] - else: - batches = discovered_datasets + batches = to_batches(datasets) for batch in batches: dataset_identifiers = [ - Identifier.create_from(selector, **identifier) + Identifier.create_from(selector, **dataset_resource.dataset_resource_id) # We have to pass the data_spec_versions here as a Source can add some # extra data to the identifier which is retrieved in a certain data format - for identifier in batch + for dataset_resource in batch ] # Load all available datasets based on the discovered dataset identifiers diff --git a/ingestify/domain/models/__init__.py b/ingestify/domain/models/__init__.py index 9cad0d4..928a0b3 100644 --- a/ingestify/domain/models/__init__.py +++ b/ingestify/domain/models/__init__.py @@ -18,6 +18,7 @@ from .source import Source from .task import Task, TaskSet from .data_spec_version_collection import DataSpecVersionCollection +from .resources import DatasetResource __all__ = [ "Selector", @@ -26,6 +27,7 @@ "Revision", "Dataset", "DatasetCollection", + "DatasetResource", "File", "DraftFile", "DatasetCreated", diff --git a/ingestify/domain/models/resources/__init__.py b/ingestify/domain/models/resources/__init__.py new file mode 100644 index 0000000..755f4a5 --- /dev/null +++ b/ingestify/domain/models/resources/__init__.py @@ -0,0 +1 @@ +from .dataset_resource import DatasetResource diff --git a/ingestify/domain/models/resources/dataset_resource.py b/ingestify/domain/models/resources/dataset_resource.py new file mode 100644 index 0000000..124cc2e --- /dev/null +++ b/ingestify/domain/models/resources/dataset_resource.py @@ -0,0 +1,62 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, Callable, TYPE_CHECKING + +if TYPE_CHECKING: + from ingestify.domain import DraftFile, File + from ingestify.domain.models.dataset.dataset import DatasetState + + +@dataclass(frozen=True) +class FileResource: + file_id: str + last_modified: datetime + data_feed_key: str + data_spec_version: str + + # DataSerializationFormat is "json" in case of json_content, otherwise file_loader will return it + # data_serialization_format: str + + json_content: Optional[str] = None + file_loader: Optional[ + Callable[ + ["DatasetResource", "FileResource", Optional["File"]], Optional["DraftFile"] + ] + ] = None + + +class DatasetResource: + def __init__( + self, dataset_resource_id: dict, name: str, metadata: dict, state: "DatasetState" + ): + self.dataset_resource_id = dataset_resource_id + self.name = name + self.metadata = metadata + self.state = state + + self._files = {} + + def add_file_resource( + self, + last_modified: datetime, + data_feed_key: str, + data_spec_version: str, + json_content: Optional[str] = None, + file_loader: Optional[ + Callable[ + ["DatasetResource", "FileResource", Optional["File"]], Optional["DraftFile"] + ] + ] = None, + ): + file_id = f"{data_feed_key}__{data_spec_version}" + + file_resource = FileResource( + file_id=file_id, + data_feed_key=data_feed_key, + data_spec_version=data_spec_version, + last_modified=last_modified, + json_content=json_content, + file_loader=file_loader, + ) + + self._files[file_id] = file_resource diff --git a/ingestify/domain/models/source.py b/ingestify/domain/models/source.py index 1780670..15588f5 100644 --- a/ingestify/domain/models/source.py +++ b/ingestify/domain/models/source.py @@ -7,6 +7,7 @@ from .data_spec_version_collection import DataSpecVersionCollection from .dataset import Identifier, Revision from .dataset.collection_metadata import DatasetCollectionMetadata +from .resources.dataset_resource import DatasetResource class Source(ABC): @@ -24,23 +25,13 @@ def provider(self) -> str: # pass @abstractmethod - def discover_datasets( + def find_datasets( self, dataset_type: str, data_spec_versions: DataSpecVersionCollection, dataset_collection_metadata: DatasetCollectionMetadata, **kwargs - ) -> Union[List[Dict], Iterator[List[Dict]]]: - pass - - @abstractmethod - def fetch_dataset_files( - self, - dataset_type: str, - identifier: Identifier, - data_spec_versions: DataSpecVersionCollection, - current_revision: Optional[Revision], - ) -> Dict[str, Optional[DraftFile]]: + ) -> Iterator[List[DatasetResource]]: pass def __repr__(self): diff --git a/ingestify/source_base.py b/ingestify/source_base.py index 4893b17..2f49349 100644 --- a/ingestify/source_base.py +++ b/ingestify/source_base.py @@ -1,6 +1,7 @@ from ingestify.application.dataset_store import DatasetStore from ingestify.domain.models import ( Dataset, + DatasetResource, DraftFile, File, Identifier, @@ -15,6 +16,7 @@ "Source", "DatasetStore", "Dataset", + "DatasetResource", "Revision", "File", "DraftFile", diff --git a/ingestify/utils.py b/ingestify/utils.py index 1333f46..d5d4742 100644 --- a/ingestify/utils.py +++ b/ingestify/utils.py @@ -13,6 +13,15 @@ from typing_extensions import Self +from itertools import islice + + +def chunker(it, size): + iterator = iter(it) + while chunk := list(islice(iterator, size)): + yield chunk + + def sanitize_exception_message(exception_message): """ Sanitizes an exception message by removing any sensitive information such as passwords. From 17affeccbf469504b6e1968efaf2ef0cee54c64b Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 16 Sep 2024 22:20:15 +0200 Subject: [PATCH 2/4] WIP --- ingestify/application/dataset_store.py | 16 +- ingestify/application/loader.py | 123 ++++++++------ ingestify/domain/models/dataset/identifier.py | 39 ++--- ingestify/domain/models/fetch_policy.py | 30 ++-- .../models/resources/dataset_resource.py | 49 ++++-- ingestify/domain/models/source.py | 9 - ingestify/exceptions.py | 4 + .../store/dataset/sqlalchemy/repository.py | 2 - ingestify/tests/test_engine.py | 154 ++++++++++-------- 9 files changed, 247 insertions(+), 179 deletions(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index f7a5a68..e2032aa 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -9,6 +9,7 @@ from typing import Dict, List, Optional, Union, Callable, BinaryIO +from ingestify.domain.models.dataset.dataset import DatasetState from ingestify.domain.models.dataset.events import RevisionAdded, MetadataUpdated from ingestify.domain.models.dataset.file_collection import FileCollection from ingestify.domain.models.event import EventBus @@ -209,9 +210,9 @@ def update_dataset( ): """The add_revision will also save the dataset.""" metadata_changed = False - if dataset.update_from_identifier(dataset_identifier): - self.dataset_repository.save(bucket=self.bucket, dataset=dataset) - metadata_changed = True + # if dataset.update_from_identifier(dataset_identifier): + # self.dataset_repository.save(bucket=self.bucket, dataset=dataset) + # metadata_changed = True self.add_revision(dataset, files) @@ -229,6 +230,9 @@ def create_dataset( dataset_type: str, provider: str, dataset_identifier: Identifier, + name: str, + state: DatasetState, + metadata: dict, files: Dict[str, DraftFile], description: str = "Create", ): @@ -237,12 +241,12 @@ def create_dataset( dataset = Dataset( bucket=self.bucket, dataset_id=self.dataset_repository.next_identity(), - name=dataset_identifier.name, - state=dataset_identifier.state, + name=name, + state=state, identifier=dataset_identifier, dataset_type=dataset_type, provider=provider, - metadata=dataset_identifier.metadata, + metadata=metadata, created_at=now, updated_at=now, ) diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 0e9fe15..4131f3e 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -1,15 +1,19 @@ import itertools +import json import logging import platform from multiprocessing import set_start_method, cpu_count -from typing import List +from typing import List, Optional from ingestify.domain.models import Dataset, Identifier, Selector, Source, Task, TaskSet from ingestify.utils import map_in_pool, TaskExecutor, chunker from .dataset_store import DatasetStore +from .. import DatasetResource, retrieve_http +from ..domain import DraftFile from ..domain.models.data_spec_version_collection import DataSpecVersionCollection from ..domain.models.extract_job import ExtractJob +from ..domain.models.resources.dataset_resource import FileResource from ..exceptions import ConfigurationError if platform.system() == "Darwin": @@ -39,69 +43,92 @@ def to_batches(input_): return batches +def load_file( + file_resource: FileResource, dataset: Optional[Dataset] = None +) -> Optional[DraftFile]: + current_file = None + if dataset: + current_file = dataset.current_revision.modified_files_map.get( + file_resource.file_id + ) + + if file_resource.json_content: + return DraftFile.from_input( + file_=json.dumps(file_resource.json_content, indent=4), + data_serialization_format="json", + data_feed_key=file_resource.data_feed_key, + data_spec_version=file_resource.data_spec_version, + modified_at=file_resource.last_modified, + ) + elif file_resource.url: + http_options = {} + if file_resource.http_options: + for k, v in file_resource.http_options.items(): + http_options[f"http_{k}"] = v + + return retrieve_http( + url=file_resource.url, + current_file=current_file, + file_data_feed_key=file_resource.data_feed_key, + file_data_spec_version=file_resource.data_spec_version, + file_data_serialization_format=file_resource.data_serialization_format + or "txt", + **http_options, + ) + else: + return file_resource.file_loader(file_resource, current_file) + + class UpdateDatasetTask(Task): def __init__( self, - source: Source, dataset: Dataset, - dataset_identifier: Identifier, - data_spec_versions: DataSpecVersionCollection, + dataset_resource: DatasetResource, store: DatasetStore, ): - self.source = source self.dataset = dataset - self.dataset_identifier = dataset_identifier - self.data_spec_versions = data_spec_versions + self.dataset_resource = dataset_resource self.store = store def run(self): - files = self.source.fetch_dataset_files( - self.dataset.dataset_type, - self.dataset_identifier, # Use the new dataset_identifier as it's more up-to-date, and contains more info - data_spec_versions=self.data_spec_versions, - current_revision=self.dataset.current_revision, - ) self.store.update_dataset( dataset=self.dataset, - dataset_identifier=self.dataset_identifier, - files=files, + dataset_identifier=Identifier(self.dataset_resource.dataset_resource_id), + files={ + file_id: load_file(file_resource, dataset=self.dataset) + for file_id, file_resource in self.dataset_resource.files.items() + }, ) def __repr__(self): - return f"UpdateDatasetTask({self.source} -> {self.dataset.identifier})" + return f"UpdateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})" class CreateDatasetTask(Task): def __init__( self, - source: Source, - dataset_type: str, - data_spec_versions: DataSpecVersionCollection, - dataset_identifier: Identifier, + dataset_resource: DatasetResource, store: DatasetStore, ): - self.source = source - self.dataset_type = dataset_type - self.data_spec_versions = data_spec_versions - self.dataset_identifier = dataset_identifier + self.dataset_resource = dataset_resource self.store = store def run(self): - files = self.source.fetch_dataset_files( - dataset_type=self.dataset_type, - identifier=self.dataset_identifier, - data_spec_versions=self.data_spec_versions, - current_revision=None, - ) self.store.create_dataset( - dataset_type=self.dataset_type, - provider=self.source.provider, - dataset_identifier=self.dataset_identifier, - files=files, + dataset_type=self.dataset_resource.dataset_type, + provider=self.dataset_resource.provider, + dataset_identifier=Identifier(**self.dataset_resource.dataset_resource_id), + name=self.dataset_resource.name, + state=self.dataset_resource.state, + metadata=self.dataset_resource.metadata, + files={ + file_id: load_file(file_resource) + for file_id, file_resource in self.dataset_resource.files.items() + }, ) def __repr__(self): - return f"CreateDatasetTask({self.source} -> {self.dataset_identifier})" + return f"CreateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})" class Loader: @@ -204,7 +231,9 @@ def run_task(task): for batch in batches: dataset_identifiers = [ - Identifier.create_from(selector, **dataset_resource.dataset_resource_id) + Identifier.create_from_selector( + selector, **dataset_resource.dataset_resource_id + ) # We have to pass the data_spec_versions here as a Source can add some # extra data to the identifier which is retrieved in a certain data format for dataset_resource in batch @@ -213,7 +242,8 @@ def run_task(task): # Load all available datasets based on the discovered dataset identifiers dataset_collection = self.store.get_dataset_collection( dataset_type=extract_job.dataset_type, - provider=extract_job.source.provider, + # Assume all DatasetResources share the same provider + provider=batch[0].provider, selector=dataset_identifiers, ) @@ -221,30 +251,29 @@ def run_task(task): total_dataset_count += len(dataset_identifiers) task_set = TaskSet() - for dataset_identifier in dataset_identifiers: + for dataset_resource in batch: + dataset_identifier = Identifier.create_from_selector( + selector, **dataset_resource.dataset_resource_id + ) + if dataset := dataset_collection.get(dataset_identifier): if extract_job.fetch_policy.should_refetch( - dataset, dataset_identifier + dataset, dataset_resource ): task_set.add( UpdateDatasetTask( - source=extract_job.source, dataset=dataset, # Current dataset from the database - dataset_identifier=dataset_identifier, # Most recent dataset_identifier - data_spec_versions=selector.data_spec_versions, + dataset_resource=dataset_resource, # Most recent dataset_resource store=self.store, ) ) else: skip_count += 1 else: - if extract_job.fetch_policy.should_fetch(dataset_identifier): + if extract_job.fetch_policy.should_fetch(dataset_resource): task_set.add( CreateDatasetTask( - source=extract_job.source, - dataset_type=extract_job.dataset_type, - dataset_identifier=dataset_identifier, - data_spec_versions=selector.data_spec_versions, + dataset_resource=dataset_resource, store=self.store, ) ) diff --git a/ingestify/domain/models/dataset/identifier.py b/ingestify/domain/models/dataset/identifier.py index 5789af0..750f05d 100644 --- a/ingestify/domain/models/dataset/identifier.py +++ b/ingestify/domain/models/dataset/identifier.py @@ -1,33 +1,24 @@ -from datetime import datetime -from typing import Optional, TYPE_CHECKING, Dict +from typing import TYPE_CHECKING -from ingestify.utils import AttributeBag +from ingestify.utils import key_from_dict if TYPE_CHECKING: - from ingestify.domain.models.dataset.dataset import DatasetState + from ingestify.domain import Selector -class Identifier(AttributeBag): - @property - def last_modified(self) -> Optional[datetime]: - return self.attributes.get("_last_modified") - - @property - def name(self) -> Optional[str]: - return self.attributes.get("_name") +class Identifier(dict): + @classmethod + def create_from_selector(cls, selector: "Selector", **kwargs): + identifier = cls(**selector.filtered_attributes) + identifier.update(kwargs) + return identifier @property - def metadata(self) -> dict: - return self.attributes.get("_metadata", {}) + def key(self): + return key_from_dict(self) - @property - def state(self) -> "DatasetState": - from ingestify.domain.models.dataset.dataset import DatasetState - - return self.attributes.get("_state", DatasetState.SCHEDULED) + def __hash__(self): + return hash(self.key) - @property - def files_last_modified(self) -> Optional[Dict[str, datetime]]: - """Return last modified per file. This makes it possible to detect when a file is added with an older - last_modified than current dataset.""" - return self.attributes.get("_files_last_modified") + def __str__(self): + return "/".join([f"{k}={v}" for k, v in self.items()]) diff --git a/ingestify/domain/models/fetch_policy.py b/ingestify/domain/models/fetch_policy.py index d0089b4..3856b8e 100644 --- a/ingestify/domain/models/fetch_policy.py +++ b/ingestify/domain/models/fetch_policy.py @@ -1,6 +1,6 @@ from datetime import timedelta -from ingestify.domain import Dataset, Identifier +from ingestify.domain import Dataset, Identifier, DatasetResource from ingestify.utils import utcnow @@ -10,25 +10,31 @@ def __init__(self): self.min_age = utcnow() - timedelta(days=2) self.last_change = utcnow() - timedelta(days=1) - def should_fetch(self, dataset_identifier: Identifier) -> bool: + def should_fetch(self, dataset_resource: DatasetResource) -> bool: # this is called when dataset does not exist yet return True - def should_refetch(self, dataset: Dataset, identifier: Identifier) -> bool: + def should_refetch( + self, dataset: Dataset, dataset_resource: DatasetResource + ) -> bool: current_revision = dataset.current_revision if not dataset.revisions: # TODO: this is weird? Dataset without any data. Fetch error? return True elif current_revision: - if identifier.files_last_modified: - if current_revision.is_changed(identifier.files_last_modified): - return True + files_last_modified = { + file.file_id: file.last_modified + for file in dataset_resource.files.values() + } + if current_revision.is_changed(files_last_modified): + return True - else: - if ( - identifier.last_modified - and current_revision.created_at < identifier.last_modified - ): - return True + # We don't set last_modified on Dataset level anymore, only on file level + # else: + # if ( + # identifier.last_modified + # and current_revision.created_at < identifier.last_modified + # ): + # return True return False diff --git a/ingestify/domain/models/resources/dataset_resource.py b/ingestify/domain/models/resources/dataset_resource.py index 124cc2e..b447b6e 100644 --- a/ingestify/domain/models/resources/dataset_resource.py +++ b/ingestify/domain/models/resources/dataset_resource.py @@ -9,6 +9,7 @@ @dataclass(frozen=True) class FileResource: + dataset_resource: "DatasetResource" file_id: str last_modified: datetime data_feed_key: str @@ -17,46 +18,70 @@ class FileResource: # DataSerializationFormat is "json" in case of json_content, otherwise file_loader will return it # data_serialization_format: str - json_content: Optional[str] = None + json_content: Optional[dict] = None + + url: Optional[str] = None + http_options: Optional[dict] = None + data_serialization_format: Optional[str] = None + file_loader: Optional[ - Callable[ - ["DatasetResource", "FileResource", Optional["File"]], Optional["DraftFile"] - ] + Callable[["FileResource", Optional["File"]], Optional["DraftFile"]] ] = None class DatasetResource: def __init__( - self, dataset_resource_id: dict, name: str, metadata: dict, state: "DatasetState" + self, + dataset_resource_id: dict, + /, + dataset_type: str, + provider: str, + name: str, + metadata: Optional[dict] = None, + state: Optional["DatasetState"] = None, ): + from ingestify.domain.models.dataset.dataset import DatasetState + + self.dataset_type = dataset_type + self.provider = provider self.dataset_resource_id = dataset_resource_id self.name = name - self.metadata = metadata - self.state = state + self.metadata = metadata or {} + self.state = state or DatasetState.COMPLETE - self._files = {} + self.files = {} - def add_file_resource( + def add_file( self, last_modified: datetime, data_feed_key: str, data_spec_version: str, - json_content: Optional[str] = None, + json_content: Optional[dict] = None, + url: Optional[str] = None, + http_options: Optional[dict] = None, + data_serialization_format: Optional[str] = None, file_loader: Optional[ Callable[ - ["DatasetResource", "FileResource", Optional["File"]], Optional["DraftFile"] + ["FileResource", Optional["File"]], + Optional["DraftFile"], ] ] = None, ): file_id = f"{data_feed_key}__{data_spec_version}" + if file_id in self.files: + raise DuplicateFile(f"File with id {file_id} already exists.") file_resource = FileResource( + dataset_resource=self, file_id=file_id, data_feed_key=data_feed_key, data_spec_version=data_spec_version, last_modified=last_modified, json_content=json_content, + url=url, + http_options=http_options, + data_serialization_format=data_serialization_format, file_loader=file_loader, ) - self._files[file_id] = file_resource + self.files[file_id] = file_resource diff --git a/ingestify/domain/models/source.py b/ingestify/domain/models/source.py index 15588f5..3dd6f6c 100644 --- a/ingestify/domain/models/source.py +++ b/ingestify/domain/models/source.py @@ -1,11 +1,7 @@ from abc import ABC, abstractmethod from typing import Dict, List, Optional, Iterable, Iterator, Union -# from ingestify.utils import ComponentFactory, ComponentRegistry - -from . import DraftFile from .data_spec_version_collection import DataSpecVersionCollection -from .dataset import Identifier, Revision from .dataset.collection_metadata import DatasetCollectionMetadata from .resources.dataset_resource import DatasetResource @@ -14,11 +10,6 @@ class Source(ABC): def __init__(self, name: str, **kwargs): self.name = name - @property - @abstractmethod - def provider(self) -> str: - pass - # TODO: consider making this required... # @abstractmethod # def discover_selectors(self, dataset_type: str) -> List[Dict]: diff --git a/ingestify/exceptions.py b/ingestify/exceptions.py index 23664bd..7a9e314 100644 --- a/ingestify/exceptions.py +++ b/ingestify/exceptions.py @@ -4,3 +4,7 @@ class IngestifyError(Exception): class ConfigurationError(IngestifyError): pass + + +class DuplicateFile(IngestifyError): + pass diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index cd35a0a..45ee720 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -30,8 +30,6 @@ def parse_value(v): def json_serializer(o): - if isinstance(o, Identifier): - o = o.filtered_attributes return json.dumps(o) diff --git a/ingestify/tests/test_engine.py b/ingestify/tests/test_engine.py index 6c3855b..e2d103e 100644 --- a/ingestify/tests/test_engine.py +++ b/ingestify/tests/test_engine.py @@ -3,7 +3,7 @@ import pytz -from ingestify import Source +from ingestify import Source, DatasetResource from ingestify.application.ingestion_engine import IngestionEngine from ingestify.domain import ( Identifier, @@ -12,6 +12,9 @@ DraftFile, Revision, ) +from ingestify.domain.models.dataset.collection_metadata import ( + DatasetCollectionMetadata, +) from ingestify.domain.models.extract_job import ExtractJob from ingestify.domain.models.fetch_policy import FetchPolicy from ingestify.main import get_engine @@ -31,49 +34,72 @@ def add_extract_job(engine: IngestionEngine, source: Source, **selector): ) -class SimpleFakeSource(Source): - @property - def provider(self) -> str: - return "fake" +def file_loader(file_resource, current_file): + if file_resource.file_id == "file1__v1": + if not current_file: + return DraftFile.from_input( + "content1", + ) + else: + return DraftFile.from_input( + "different_content", + ) - def discover_datasets( + elif file_resource.file_id == "file2__v1": + return DraftFile.from_input( + "some_content" + str(file_resource.dataset_resource.dataset_resource_id) + ) + + +class SimpleFakeSource(Source): + def find_datasets( self, dataset_type: str, data_spec_versions: DataSpecVersionCollection, + dataset_collection_metadata: DatasetCollectionMetadata, competition_id, season_id, **kwargs ): - return [ + dataset_resource = DatasetResource( dict( competition_id=competition_id, season_id=season_id, - _name="Test Dataset", - _last_modified=datetime.now(pytz.utc), - ) - ] + ), + provider="fake", + dataset_type="match", + name="Test Dataset", + ) - def fetch_dataset_files( - self, - dataset_type: str, - identifier: Identifier, - data_spec_versions: DataSpecVersionCollection, - current_revision: Optional[Revision], - ): - if current_revision: - return { - "file1": DraftFile.from_input( - "different_content", - ), - "file2": DraftFile.from_input("some_content" + identifier.key), - } - else: - return { - "file1": DraftFile.from_input( - "content1", - ), - "file2": DraftFile.from_input("some_content" + identifier.key), - } + last_modified = datetime.now(pytz.utc) + + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file1", + data_spec_version="v1", + file_loader=file_loader, + ) + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file2", + data_spec_version="v1", + file_loader=file_loader, + ) + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file3", + data_spec_version="v1", + json_content={"test": "some-content"}, + ) + # dataset_resource.add_file( + # last_modified=last_modified, + # data_feed_key="file4", + # data_spec_version="v1", + # url="https://raw.githubusercontent.com/statsbomb/open-data/refs/heads/master/data/three-sixty/3788741.json", + # data_serialization_format="json" + # ) + + yield dataset_resource class BatchSource(Source): @@ -83,14 +109,11 @@ def __init__(self, name, callback): self.should_stop = False self.idx = 0 - @property - def provider(self) -> str: - return "fake" - - def discover_datasets( + def find_datasets( self, dataset_type: str, data_spec_versions: DataSpecVersionCollection, + dataset_collection_metadata: DatasetCollectionMetadata, competition_id, season_id, **kwargs @@ -100,39 +123,36 @@ def discover_datasets( for i in range(10): match_id = self.idx self.idx += 1 - item = dict( - competition_id=competition_id, - season_id=season_id, - match_id=match_id, - _name="Test Dataset", - _last_modified=datetime.now(pytz.utc), + dataset_resource = DatasetResource( + dict( + competition_id=competition_id, + season_id=season_id, + match_id=match_id, + ), + name="Test dataset", + provider="fake", + dataset_type="match", + ) + + last_modified = datetime.now(pytz.utc) + + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file1", + data_spec_version="v1", + file_loader=file_loader, ) - items.append(item) + dataset_resource.add_file( + last_modified=last_modified, + data_feed_key="file2", + data_spec_version="v1", + file_loader=file_loader, + ) + + items.append(dataset_resource) yield items self.callback and self.callback(self.idx) - def fetch_dataset_files( - self, - dataset_type: str, - identifier: Identifier, - data_spec_versions: DataSpecVersionCollection, - current_revision: Optional[Revision], - ): - if current_revision: - return { - "file1": DraftFile.from_input( - "different_content", - ), - "file2": DraftFile.from_input("some_content" + identifier.key), - } - else: - return { - "file1": DraftFile.from_input( - "content1", - ), - "file2": DraftFile.from_input("some_content" + identifier.key), - } - def test_engine(config_file): engine = get_engine(config_file, "main") @@ -155,7 +175,7 @@ def test_engine(config_file): dataset = datasets.first() assert dataset.identifier == Identifier(competition_id=1, season_id=2) assert len(dataset.revisions) == 2 - assert len(dataset.revisions[0].modified_files) == 2 + assert len(dataset.revisions[0].modified_files) == 3 assert len(dataset.revisions[1].modified_files) == 1 add_extract_job( From a48b1fe717429c7fdf319a3b4f441efbc3e4f56f Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Tue, 17 Sep 2024 14:57:58 +0200 Subject: [PATCH 3/4] Finish DatasetResource --- ingestify/application/dataset_store.py | 9 +++++---- ingestify/application/loader.py | 7 ++++--- ingestify/domain/models/dataset/dataset.py | 14 +++++++------- .../domain/models/resources/dataset_resource.py | 11 ++++++++++- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index e2032aa..f2c1934 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -17,6 +17,7 @@ Dataset, DatasetCollection, DatasetRepository, + DatasetResource, DraftFile, File, LoadedFile, @@ -205,14 +206,14 @@ def add_revision( def update_dataset( self, dataset: Dataset, - dataset_identifier: Identifier, + dataset_resource: DatasetResource, files: Dict[str, DraftFile], ): """The add_revision will also save the dataset.""" metadata_changed = False - # if dataset.update_from_identifier(dataset_identifier): - # self.dataset_repository.save(bucket=self.bucket, dataset=dataset) - # metadata_changed = True + if dataset.update_from_resource(dataset_resource): + self.dataset_repository.save(bucket=self.bucket, dataset=dataset) + metadata_changed = True self.add_revision(dataset, files) diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 4131f3e..4b69b14 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) -DEFAULT_CHUNK_SIZE = 100 +DEFAULT_CHUNK_SIZE = 1000 def to_batches(input_): @@ -52,7 +52,8 @@ def load_file( file_resource.file_id ) - if file_resource.json_content: + if file_resource.json_content is not None: + # Empty dictionary is allowed return DraftFile.from_input( file_=json.dumps(file_resource.json_content, indent=4), data_serialization_format="json", @@ -93,7 +94,7 @@ def __init__( def run(self): self.store.update_dataset( dataset=self.dataset, - dataset_identifier=Identifier(self.dataset_resource.dataset_resource_id), + dataset_resource=self.dataset_resource, files={ file_id: load_file(file_resource, dataset=self.dataset) for file_id, file_resource in self.dataset_resource.files.items() diff --git a/ingestify/domain/models/dataset/dataset.py b/ingestify/domain/models/dataset/dataset.py index d39bd05..a1309b3 100644 --- a/ingestify/domain/models/dataset/dataset.py +++ b/ingestify/domain/models/dataset/dataset.py @@ -53,18 +53,18 @@ def add_revision(self, revision: Revision): self.revisions.append(revision) self.updated_at = utcnow() - def update_from_identifier(self, dataset_identifier: Identifier) -> bool: + def update_from_resource(self, dataset_resource) -> bool: changed = False - if self.name != dataset_identifier.name: - self.name = dataset_identifier.name + if self.name != dataset_resource.name: + self.name = dataset_resource.name changed = True - if self.metadata != dataset_identifier.metadata: - self.metadata = dataset_identifier.metadata + if self.metadata != dataset_resource.metadata: + self.metadata = dataset_resource.metadata changed = True - if self.state != dataset_identifier.state: - self.state = dataset_identifier.state + if self.state != dataset_resource.state: + self.state = dataset_resource.state changed = True if changed: diff --git a/ingestify/domain/models/resources/dataset_resource.py b/ingestify/domain/models/resources/dataset_resource.py index b447b6e..67e8d4d 100644 --- a/ingestify/domain/models/resources/dataset_resource.py +++ b/ingestify/domain/models/resources/dataset_resource.py @@ -2,6 +2,8 @@ from datetime import datetime from typing import Optional, Callable, TYPE_CHECKING +from ingestify.exceptions import DuplicateFile + if TYPE_CHECKING: from ingestify.domain import DraftFile, File from ingestify.domain.models.dataset.dataset import DatasetState @@ -28,6 +30,12 @@ class FileResource: Callable[["FileResource", Optional["File"]], Optional["DraftFile"]] ] = None + def __post_init__(self): + if self.json_content is None and not self.url and not self.file_loader: + raise TypeError( + "You need to specify `json_content`, `url` or a custom `file_loader`" + ) + class DatasetResource: def __init__( @@ -55,7 +63,8 @@ def add_file( self, last_modified: datetime, data_feed_key: str, - data_spec_version: str, + # Some sources might not have a DataSpecVersion. Set a default + data_spec_version: str = "v1", json_content: Optional[dict] = None, url: Optional[str] = None, http_options: Optional[dict] = None, From 0f993f20dad3ba34bc48df6eb47d273451bf646d Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Tue, 17 Sep 2024 21:57:50 +0200 Subject: [PATCH 4/4] Minor fixes --- ingestify/application/ingestion_engine.py | 4 +- ingestify/application/loader.py | 52 ++++++++++++++++------- ingestify/cmdline.py | 14 ++++-- ingestify/utils.py | 22 +++++++++- 4 files changed, 70 insertions(+), 22 deletions(-) diff --git a/ingestify/application/ingestion_engine.py b/ingestify/application/ingestion_engine.py index f649a54..ead1859 100644 --- a/ingestify/application/ingestion_engine.py +++ b/ingestify/application/ingestion_engine.py @@ -21,8 +21,8 @@ def __init__(self, store: DatasetStore): def add_extract_job(self, extract_job: ExtractJob): self.loader.add_extract_job(extract_job) - def load(self): - self.loader.collect_and_run() + def load(self, dry_run: bool = False): + self.loader.collect_and_run(dry_run=dry_run) def list_datasets(self, as_count: bool = False): """Consider moving this to DataStore""" diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 4b69b14..5960034 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -33,7 +33,12 @@ def to_batches(input_): batches = [input_] else: # Assume it's an iterator. Peek what's inside, and put it back - peek = next(input_) + try: + peek = next(input_) + except StopIteration: + # Nothing to batch + return [] + input_ = itertools.chain([peek], input_) if not isinstance(peek, list): @@ -140,7 +145,7 @@ def __init__(self, store: DatasetStore): def add_extract_job(self, extract_job: ExtractJob): self.extract_jobs.append(extract_job) - def collect_and_run(self): + def collect_and_run(self, dry_run: bool = False): total_dataset_count = 0 # First collect all selectors, before discovering datasets @@ -154,7 +159,9 @@ def collect_and_run(self): dynamic_selectors = [ selector for selector in extract_job.selectors if selector.is_dynamic ] - if dynamic_selectors: + + no_selectors = len(static_selectors) == 1 and not bool(static_selectors[0]) + if dynamic_selectors or no_selectors: if hasattr(extract_job.source, "discover_selectors"): logger.debug( f"Discovering selectors from {extract_job.source.__class__.__name__}" @@ -165,29 +172,44 @@ def collect_and_run(self): all_selectors = extract_job.source.discover_selectors( extract_job.dataset_type ) - extra_static_selectors = [] - for dynamic_selector in dynamic_selectors: - dynamic_job_selectors = [ + if no_selectors: + # When there were no selectors specified, just use all of them + extra_static_selectors = [ Selector.build( job_selector, data_spec_versions=extract_job.data_spec_versions, ) for job_selector in all_selectors - if dynamic_selector.is_match(job_selector) ] - extra_static_selectors.extend(dynamic_job_selectors) - logger.info(f"Added {len(dynamic_job_selectors)} selectors") + static_selectors = [] + else: + extra_static_selectors = [] + for dynamic_selector in dynamic_selectors: + dynamic_job_selectors = [ + Selector.build( + job_selector, + data_spec_versions=extract_job.data_spec_versions, + ) + for job_selector in all_selectors + if dynamic_selector.is_match(job_selector) + ] + extra_static_selectors.extend(dynamic_job_selectors) + logger.info(f"Added {len(dynamic_job_selectors)} selectors") static_selectors.extend(extra_static_selectors) + logger.info( f"Discovered {len(extra_static_selectors)} selectors from {extract_job.source.__class__.__name__}" ) else: - raise ConfigurationError( - f"Dynamic selectors cannot be used for " - f"{extract_job.source.__class__.__name__} because it doesn't support" - f" selector discovery" - ) + if not no_selectors: + # When there are no selectors and no discover_selectors, just pass it through. It might break + # later on + raise ConfigurationError( + f"Dynamic selectors cannot be used for " + f"{extract_job.source.__class__.__name__} because it doesn't support" + f" selector discovery" + ) # Merge selectors when source, dataset_type and actual selector is the same. This makes # sure there will be only 1 dataset for this combination @@ -204,7 +226,7 @@ def run_task(task): logger.info(f"Running task {task}") task.run() - task_executor = TaskExecutor() + task_executor = TaskExecutor(dry_run=dry_run) for extract_job, selector in selectors.values(): logger.debug( diff --git a/ingestify/cmdline.py b/ingestify/cmdline.py index f82a39e..b242f65 100644 --- a/ingestify/cmdline.py +++ b/ingestify/cmdline.py @@ -57,8 +57,16 @@ def cli(): help="bucket", type=str, ) -@click.option("--debug", "debug", required=False, help="Debugging enabled", type=bool) -def run(config_file: str, bucket: Optional[str], debug: Optional[bool]): +# @click.option("--debug", "debug", required=False, help="Debugging enabled", type=bool) +@click.option( + "--dry-run", + "dry_run", + required=False, + help="Dry run - don't store anything", + is_flag=True, + type=bool, +) +def run(config_file: str, bucket: Optional[str], dry_run: Optional[bool]): try: engine = get_engine(config_file, bucket) except ConfigurationError as e: @@ -68,7 +76,7 @@ def run(config_file: str, bucket: Optional[str], debug: Optional[bool]): logger.exception(f"Failed due a configuration error: {e}") sys.exit(1) - engine.load() + engine.load(dry_run=dry_run) logger.info("Done") diff --git a/ingestify/utils.py b/ingestify/utils.py index d5d4742..a1dfe32 100644 --- a/ingestify/utils.py +++ b/ingestify/utils.py @@ -1,5 +1,6 @@ import abc import inspect +import logging import os import time import re @@ -16,6 +17,9 @@ from itertools import islice +logger = logging.getLogger(__name__) + + def chunker(it, size): iterator = iter(it) while chunk := list(islice(iterator, size)): @@ -219,9 +223,23 @@ def close(self): return True +class DummyPool: + def map_async(self, func, iterable): + logger.info(f"DummyPool: not running {len(list(iterable))} tasks") + return None + + def join(self): + return True + + def close(self): + return True + + class TaskExecutor: - def __init__(self, processes=0): - if os.environ.get("INGESTIFY_RUN_EAGER") == "true": + def __init__(self, processes=0, dry_run: bool = False): + if dry_run: + pool = DummyPool() + elif os.environ.get("INGESTIFY_RUN_EAGER") == "true": pool = SyncPool() else: if not processes: