Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvo committed Sep 16, 2024
1 parent d980dc5 commit 17affec
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 179 deletions.
16 changes: 10 additions & 6 deletions ingestify/application/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from typing import Dict, List, Optional, Union, Callable, BinaryIO

from ingestify.domain.models.dataset.dataset import DatasetState
from ingestify.domain.models.dataset.events import RevisionAdded, MetadataUpdated
from ingestify.domain.models.dataset.file_collection import FileCollection
from ingestify.domain.models.event import EventBus
Expand Down Expand Up @@ -209,9 +210,9 @@ def update_dataset(
):
"""The add_revision will also save the dataset."""
metadata_changed = False
if dataset.update_from_identifier(dataset_identifier):
self.dataset_repository.save(bucket=self.bucket, dataset=dataset)
metadata_changed = True
# if dataset.update_from_identifier(dataset_identifier):
# self.dataset_repository.save(bucket=self.bucket, dataset=dataset)
# metadata_changed = True

self.add_revision(dataset, files)

Expand All @@ -229,6 +230,9 @@ def create_dataset(
dataset_type: str,
provider: str,
dataset_identifier: Identifier,
name: str,
state: DatasetState,
metadata: dict,
files: Dict[str, DraftFile],
description: str = "Create",
):
Expand All @@ -237,12 +241,12 @@ def create_dataset(
dataset = Dataset(
bucket=self.bucket,
dataset_id=self.dataset_repository.next_identity(),
name=dataset_identifier.name,
state=dataset_identifier.state,
name=name,
state=state,
identifier=dataset_identifier,
dataset_type=dataset_type,
provider=provider,
metadata=dataset_identifier.metadata,
metadata=metadata,
created_at=now,
updated_at=now,
)
Expand Down
123 changes: 76 additions & 47 deletions ingestify/application/loader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import itertools
import json
import logging
import platform
from multiprocessing import set_start_method, cpu_count
from typing import List
from typing import List, Optional

from ingestify.domain.models import Dataset, Identifier, Selector, Source, Task, TaskSet
from ingestify.utils import map_in_pool, TaskExecutor, chunker

from .dataset_store import DatasetStore
from .. import DatasetResource, retrieve_http
from ..domain import DraftFile
from ..domain.models.data_spec_version_collection import DataSpecVersionCollection
from ..domain.models.extract_job import ExtractJob
from ..domain.models.resources.dataset_resource import FileResource
from ..exceptions import ConfigurationError

if platform.system() == "Darwin":
Expand Down Expand Up @@ -39,69 +43,92 @@ def to_batches(input_):
return batches


def load_file(
file_resource: FileResource, dataset: Optional[Dataset] = None
) -> Optional[DraftFile]:
current_file = None
if dataset:
current_file = dataset.current_revision.modified_files_map.get(
file_resource.file_id
)

if file_resource.json_content:
return DraftFile.from_input(
file_=json.dumps(file_resource.json_content, indent=4),
data_serialization_format="json",
data_feed_key=file_resource.data_feed_key,
data_spec_version=file_resource.data_spec_version,
modified_at=file_resource.last_modified,
)
elif file_resource.url:
http_options = {}
if file_resource.http_options:
for k, v in file_resource.http_options.items():
http_options[f"http_{k}"] = v

return retrieve_http(
url=file_resource.url,
current_file=current_file,
file_data_feed_key=file_resource.data_feed_key,
file_data_spec_version=file_resource.data_spec_version,
file_data_serialization_format=file_resource.data_serialization_format
or "txt",
**http_options,
)
else:
return file_resource.file_loader(file_resource, current_file)


class UpdateDatasetTask(Task):
def __init__(
self,
source: Source,
dataset: Dataset,
dataset_identifier: Identifier,
data_spec_versions: DataSpecVersionCollection,
dataset_resource: DatasetResource,
store: DatasetStore,
):
self.source = source
self.dataset = dataset
self.dataset_identifier = dataset_identifier
self.data_spec_versions = data_spec_versions
self.dataset_resource = dataset_resource
self.store = store

def run(self):
files = self.source.fetch_dataset_files(
self.dataset.dataset_type,
self.dataset_identifier, # Use the new dataset_identifier as it's more up-to-date, and contains more info
data_spec_versions=self.data_spec_versions,
current_revision=self.dataset.current_revision,
)
self.store.update_dataset(
dataset=self.dataset,
dataset_identifier=self.dataset_identifier,
files=files,
dataset_identifier=Identifier(self.dataset_resource.dataset_resource_id),
files={
file_id: load_file(file_resource, dataset=self.dataset)
for file_id, file_resource in self.dataset_resource.files.items()
},
)

def __repr__(self):
return f"UpdateDatasetTask({self.source} -> {self.dataset.identifier})"
return f"UpdateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})"


class CreateDatasetTask(Task):
def __init__(
self,
source: Source,
dataset_type: str,
data_spec_versions: DataSpecVersionCollection,
dataset_identifier: Identifier,
dataset_resource: DatasetResource,
store: DatasetStore,
):
self.source = source
self.dataset_type = dataset_type
self.data_spec_versions = data_spec_versions
self.dataset_identifier = dataset_identifier
self.dataset_resource = dataset_resource
self.store = store

def run(self):
files = self.source.fetch_dataset_files(
dataset_type=self.dataset_type,
identifier=self.dataset_identifier,
data_spec_versions=self.data_spec_versions,
current_revision=None,
)
self.store.create_dataset(
dataset_type=self.dataset_type,
provider=self.source.provider,
dataset_identifier=self.dataset_identifier,
files=files,
dataset_type=self.dataset_resource.dataset_type,
provider=self.dataset_resource.provider,
dataset_identifier=Identifier(**self.dataset_resource.dataset_resource_id),
name=self.dataset_resource.name,
state=self.dataset_resource.state,
metadata=self.dataset_resource.metadata,
files={
file_id: load_file(file_resource)
for file_id, file_resource in self.dataset_resource.files.items()
},
)

def __repr__(self):
return f"CreateDatasetTask({self.source} -> {self.dataset_identifier})"
return f"CreateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})"


class Loader:
Expand Down Expand Up @@ -204,7 +231,9 @@ def run_task(task):

for batch in batches:
dataset_identifiers = [
Identifier.create_from(selector, **dataset_resource.dataset_resource_id)
Identifier.create_from_selector(
selector, **dataset_resource.dataset_resource_id
)
# We have to pass the data_spec_versions here as a Source can add some
# extra data to the identifier which is retrieved in a certain data format
for dataset_resource in batch
Expand All @@ -213,38 +242,38 @@ def run_task(task):
# Load all available datasets based on the discovered dataset identifiers
dataset_collection = self.store.get_dataset_collection(
dataset_type=extract_job.dataset_type,
provider=extract_job.source.provider,
# Assume all DatasetResources share the same provider
provider=batch[0].provider,
selector=dataset_identifiers,
)

skip_count = 0
total_dataset_count += len(dataset_identifiers)

task_set = TaskSet()
for dataset_identifier in dataset_identifiers:
for dataset_resource in batch:
dataset_identifier = Identifier.create_from_selector(
selector, **dataset_resource.dataset_resource_id
)

if dataset := dataset_collection.get(dataset_identifier):
if extract_job.fetch_policy.should_refetch(
dataset, dataset_identifier
dataset, dataset_resource
):
task_set.add(
UpdateDatasetTask(
source=extract_job.source,
dataset=dataset, # Current dataset from the database
dataset_identifier=dataset_identifier, # Most recent dataset_identifier
data_spec_versions=selector.data_spec_versions,
dataset_resource=dataset_resource, # Most recent dataset_resource
store=self.store,
)
)
else:
skip_count += 1
else:
if extract_job.fetch_policy.should_fetch(dataset_identifier):
if extract_job.fetch_policy.should_fetch(dataset_resource):
task_set.add(
CreateDatasetTask(
source=extract_job.source,
dataset_type=extract_job.dataset_type,
dataset_identifier=dataset_identifier,
data_spec_versions=selector.data_spec_versions,
dataset_resource=dataset_resource,
store=self.store,
)
)
Expand Down
39 changes: 15 additions & 24 deletions ingestify/domain/models/dataset/identifier.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
from datetime import datetime
from typing import Optional, TYPE_CHECKING, Dict
from typing import TYPE_CHECKING

from ingestify.utils import AttributeBag
from ingestify.utils import key_from_dict

if TYPE_CHECKING:
from ingestify.domain.models.dataset.dataset import DatasetState
from ingestify.domain import Selector


class Identifier(AttributeBag):
@property
def last_modified(self) -> Optional[datetime]:
return self.attributes.get("_last_modified")

@property
def name(self) -> Optional[str]:
return self.attributes.get("_name")
class Identifier(dict):
@classmethod
def create_from_selector(cls, selector: "Selector", **kwargs):
identifier = cls(**selector.filtered_attributes)
identifier.update(kwargs)
return identifier

@property
def metadata(self) -> dict:
return self.attributes.get("_metadata", {})
def key(self):
return key_from_dict(self)

@property
def state(self) -> "DatasetState":
from ingestify.domain.models.dataset.dataset import DatasetState

return self.attributes.get("_state", DatasetState.SCHEDULED)
def __hash__(self):
return hash(self.key)

@property
def files_last_modified(self) -> Optional[Dict[str, datetime]]:
"""Return last modified per file. This makes it possible to detect when a file is added with an older
last_modified than current dataset."""
return self.attributes.get("_files_last_modified")
def __str__(self):
return "/".join([f"{k}={v}" for k, v in self.items()])
30 changes: 18 additions & 12 deletions ingestify/domain/models/fetch_policy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import timedelta

from ingestify.domain import Dataset, Identifier
from ingestify.domain import Dataset, Identifier, DatasetResource
from ingestify.utils import utcnow


Expand All @@ -10,25 +10,31 @@ def __init__(self):
self.min_age = utcnow() - timedelta(days=2)
self.last_change = utcnow() - timedelta(days=1)

def should_fetch(self, dataset_identifier: Identifier) -> bool:
def should_fetch(self, dataset_resource: DatasetResource) -> bool:
# this is called when dataset does not exist yet
return True

def should_refetch(self, dataset: Dataset, identifier: Identifier) -> bool:
def should_refetch(
self, dataset: Dataset, dataset_resource: DatasetResource
) -> bool:
current_revision = dataset.current_revision
if not dataset.revisions:
# TODO: this is weird? Dataset without any data. Fetch error?
return True
elif current_revision:
if identifier.files_last_modified:
if current_revision.is_changed(identifier.files_last_modified):
return True
files_last_modified = {
file.file_id: file.last_modified
for file in dataset_resource.files.values()
}
if current_revision.is_changed(files_last_modified):
return True

else:
if (
identifier.last_modified
and current_revision.created_at < identifier.last_modified
):
return True
# We don't set last_modified on Dataset level anymore, only on file level
# else:
# if (
# identifier.last_modified
# and current_revision.created_at < identifier.last_modified
# ):
# return True

return False
Loading

0 comments on commit 17affec

Please sign in to comment.