Skip to content

Commit

Permalink
Merge branch 'main' of github.com:PySport/ingestify
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvo committed Aug 16, 2024
2 parents 7defccc + 78417ff commit 692c6b5
Show file tree
Hide file tree
Showing 16 changed files with 481 additions and 89 deletions.
9 changes: 8 additions & 1 deletion ingestify/application/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ def get_dataset_collection(
if isinstance(selector, dict):
# By-pass the build as we don't want to specify data_spec_versions here... (for now)
selector = Selector(selector)
elif isinstance(selector, list):
if not selector:
return DatasetCollection()

if isinstance(selector[0], dict):
# Convert all selector dicts to Selectors
selector = [Selector(_) for _ in selector]

dataset_collection = self.dataset_repository.get_dataset_collection(
bucket=self.bucket,
Expand Down Expand Up @@ -248,7 +255,7 @@ def load_files(
dataset: Dataset,
data_feed_keys: Optional[List[str]] = None,
lazy: bool = False,
auto_rewind: bool = True
auto_rewind: bool = True,
) -> FileCollection:
current_revision = dataset.current_revision
files = {}
Expand Down
139 changes: 78 additions & 61 deletions ingestify/application/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import List

from ingestify.domain.models import Dataset, Identifier, Selector, Source, Task, TaskSet
from ingestify.utils import map_in_pool
from ingestify.utils import map_in_pool, TaskExecutor

from .dataset_store import DatasetStore
from ..domain.models.data_spec_version_collection import DataSpecVersionCollection
Expand Down Expand Up @@ -94,8 +94,6 @@ def add_extract_job(self, extract_job: ExtractJob):
self.extract_jobs.append(extract_job)

def collect_and_run(self):
task_set = TaskSet()

total_dataset_count = 0

# First collect all selectors, before discovering datasets
Expand Down Expand Up @@ -155,77 +153,96 @@ def collect_and_run(self):
else:
selectors[key] = (extract_job, selector)

def run_task(task):
logger.info(f"Running task {task}")
task.run()

task_executor = TaskExecutor()

for extract_job, selector in selectors.values():
logger.debug(
f"Discovering datasets from {extract_job.source.__class__.__name__} using selector {selector}"
)
dataset_identifiers = [
Identifier.create_from(selector, **identifier)
# We have to pass the data_spec_versions here as a Source can add some
# extra data to the identifier which is retrieved in a certain data format
for identifier in extract_job.source.discover_datasets(
dataset_type=extract_job.dataset_type,
data_spec_versions=selector.data_spec_versions,
**selector.filtered_attributes,
)
]

task_subset = TaskSet()

dataset_collection = self.store.get_dataset_collection(
dataset_collection_metadata = self.store.get_dataset_collection(
dataset_type=extract_job.dataset_type,
provider=extract_job.source.provider,
data_spec_versions=selector.data_spec_versions,
selector=selector,
metadata_only=True,
).metadata

# There are two different, but similar flows here:
# 1. The discover_datasets returns a list, and the entire list can be processed at once
# 2. The discover_datasets returns an iterator of batches, in this case we need to process each batch
discovered_datasets = extract_job.source.discover_datasets(
dataset_type=extract_job.dataset_type,
data_spec_versions=selector.data_spec_versions,
dataset_collection_metadata=dataset_collection_metadata,
**selector.filtered_attributes,
)

skip_count = 0
total_dataset_count += len(dataset_identifiers)

for dataset_identifier in dataset_identifiers:
if dataset := dataset_collection.get(dataset_identifier):
if extract_job.fetch_policy.should_refetch(
dataset, dataset_identifier
):
task_subset.add(
UpdateDatasetTask(
source=extract_job.source,
dataset=dataset, # Current dataset from the database
dataset_identifier=dataset_identifier, # Most recent dataset_identifier
data_spec_versions=selector.data_spec_versions,
store=self.store,
if isinstance(discovered_datasets, list):
batches = [discovered_datasets]
else:
batches = discovered_datasets

for batch in batches:
dataset_identifiers = [
Identifier.create_from(selector, **identifier)
# We have to pass the data_spec_versions here as a Source can add some
# extra data to the identifier which is retrieved in a certain data format
for identifier in batch
]

# Load all available datasets based on the discovered dataset identifiers
dataset_collection = self.store.get_dataset_collection(
dataset_type=extract_job.dataset_type,
provider=extract_job.source.provider,
selector=dataset_identifiers,
)

skip_count = 0
total_dataset_count += len(dataset_identifiers)

task_set = TaskSet()
for dataset_identifier in dataset_identifiers:
if dataset := dataset_collection.get(dataset_identifier):
if extract_job.fetch_policy.should_refetch(
dataset, dataset_identifier
):
task_set.add(
UpdateDatasetTask(
source=extract_job.source,
dataset=dataset, # Current dataset from the database
dataset_identifier=dataset_identifier, # Most recent dataset_identifier
data_spec_versions=selector.data_spec_versions,
store=self.store,
)
)
)
else:
skip_count += 1
else:
skip_count += 1
else:
if extract_job.fetch_policy.should_fetch(dataset_identifier):
task_subset.add(
CreateDatasetTask(
source=extract_job.source,
dataset_type=extract_job.dataset_type,
dataset_identifier=dataset_identifier,
data_spec_versions=selector.data_spec_versions,
store=self.store,
if extract_job.fetch_policy.should_fetch(dataset_identifier):
task_set.add(
CreateDatasetTask(
source=extract_job.source,
dataset_type=extract_job.dataset_type,
dataset_identifier=dataset_identifier,
data_spec_versions=selector.data_spec_versions,
store=self.store,
)
)
)
else:
skip_count += 1

logger.info(
f"Discovered {len(dataset_identifiers)} datasets from {extract_job.source.__class__.__name__} "
f"using selector {selector} => {len(task_subset)} tasks. {skip_count} skipped."
)
else:
skip_count += 1

task_set += task_subset
logger.info(
f"Discovered {len(dataset_identifiers)} datasets from {extract_job.source.__class__.__name__} "
f"using selector {selector} => {len(task_set)} tasks. {skip_count} skipped."
)

if len(task_set):
processes = cpu_count()
logger.info(f"Scheduled {len(task_set)} tasks. With {processes} processes")
task_executor.run(run_task, task_set)
logger.info(f"Scheduled {len(task_set)} tasks")

def run_task(task):
logger.info(f"Running task {task}")
task.run()
task_executor.join()

map_in_pool(run_task, task_set)
else:
logger.info("Nothing to do.")
logger.info("Done")
14 changes: 12 additions & 2 deletions ingestify/domain/models/dataset/collection.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
from typing import List
from typing import List, Optional

from .collection_metadata import DatasetCollectionMetadata
from .dataset import Dataset
from .identifier import Identifier


class DatasetCollection:
def __init__(self, datasets: List[Dataset] = None):
def __init__(
self,
metadata: Optional[DatasetCollectionMetadata] = None,
datasets: Optional[List[Dataset]] = None,
):
datasets = datasets or []

# TODO: this fails when datasets contains different dataset_types with overlapping identifiers
self.datasets: dict[str, Dataset] = {
dataset.identifier.key: dataset for dataset in datasets
}
self.metadata = metadata

def loaded(self):
return self.metadata.count == len(self.datasets)

def get(self, dataset_identifier: Identifier) -> Dataset:
return self.datasets.get(dataset_identifier.key)
Expand Down
9 changes: 9 additions & 0 deletions ingestify/domain/models/dataset/collection_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional


@dataclass
class DatasetCollectionMetadata:
last_modified: Optional[datetime]
count: int
3 changes: 2 additions & 1 deletion ingestify/domain/models/dataset/dataset_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def get_dataset_collection(
dataset_type: Optional[str] = None,
dataset_id: Optional[Union[str, List[str]]] = None,
provider: Optional[str] = None,
selector: Optional[Selector] = None,
selector: Optional[Union[Selector, List[Selector]]] = None,
metadata_only: bool = False,
) -> DatasetCollection:
pass

Expand Down
2 changes: 1 addition & 1 deletion ingestify/domain/models/dataset/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class DraftFile:
def from_input(
cls,
file_,
data_serialization_format="txt",
data_feed_key=None,
data_spec_version=None,
data_serialization_format=None,
modified_at=None,
):
# Pass-through for these types
Expand Down
2 changes: 1 addition & 1 deletion ingestify/domain/models/dataset/file_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def get_file(
self,
data_feed_key: Optional[str] = None,
data_spec_version: Optional[str] = None,
auto_rewind: Optional[bool] = None
auto_rewind: Optional[bool] = None,
) -> Optional[LoadedFile]:
if not data_feed_key and not data_spec_version:
raise ValueError(
Expand Down
11 changes: 8 additions & 3 deletions ingestify/domain/models/source.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Iterable, Iterator, Union

# from ingestify.utils import ComponentFactory, ComponentRegistry

from . import DraftFile
from .data_spec_version_collection import DataSpecVersionCollection
from .dataset import Identifier, Revision
from .dataset.collection_metadata import DatasetCollectionMetadata


class Source(ABC):
Expand All @@ -24,8 +25,12 @@ def provider(self) -> str:

@abstractmethod
def discover_datasets(
self, dataset_type: str, data_spec_versions: DataSpecVersionCollection, **kwargs
) -> List[Dict]:
self,
dataset_type: str,
data_spec_versions: DataSpecVersionCollection,
dataset_collection_metadata: DatasetCollectionMetadata,
**kwargs
) -> Union[List[Dict], Iterator[List[Dict]]]:
pass

@abstractmethod
Expand Down
3 changes: 3 additions & 0 deletions ingestify/infra/store/dataset/local_dataset_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def supports(cls, url: str) -> bool:

def __init__(self, url: str):
self.base_dir = Path(url[7:])
raise DeprecationWarning(
"This Repository should not be used. Better use SqlAlchemyDatasetRepository with a local sqlite database."
)

def get_dataset_collection(
self,
Expand Down
Loading

0 comments on commit 692c6b5

Please sign in to comment.