diff --git a/ingestify/application/ingestion_engine.py b/ingestify/application/ingestion_engine.py index f649a54..ead1859 100644 --- a/ingestify/application/ingestion_engine.py +++ b/ingestify/application/ingestion_engine.py @@ -21,8 +21,8 @@ def __init__(self, store: DatasetStore): def add_extract_job(self, extract_job: ExtractJob): self.loader.add_extract_job(extract_job) - def load(self): - self.loader.collect_and_run() + def load(self, dry_run: bool = False): + self.loader.collect_and_run(dry_run=dry_run) def list_datasets(self, as_count: bool = False): """Consider moving this to DataStore""" diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 4b69b14..5960034 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -33,7 +33,12 @@ def to_batches(input_): batches = [input_] else: # Assume it's an iterator. Peek what's inside, and put it back - peek = next(input_) + try: + peek = next(input_) + except StopIteration: + # Nothing to batch + return [] + input_ = itertools.chain([peek], input_) if not isinstance(peek, list): @@ -140,7 +145,7 @@ def __init__(self, store: DatasetStore): def add_extract_job(self, extract_job: ExtractJob): self.extract_jobs.append(extract_job) - def collect_and_run(self): + def collect_and_run(self, dry_run: bool = False): total_dataset_count = 0 # First collect all selectors, before discovering datasets @@ -154,7 +159,9 @@ def collect_and_run(self): dynamic_selectors = [ selector for selector in extract_job.selectors if selector.is_dynamic ] - if dynamic_selectors: + + no_selectors = len(static_selectors) == 1 and not bool(static_selectors[0]) + if dynamic_selectors or no_selectors: if hasattr(extract_job.source, "discover_selectors"): logger.debug( f"Discovering selectors from {extract_job.source.__class__.__name__}" @@ -165,29 +172,44 @@ def collect_and_run(self): all_selectors = extract_job.source.discover_selectors( extract_job.dataset_type ) - extra_static_selectors = [] - for dynamic_selector in dynamic_selectors: - dynamic_job_selectors = [ + if no_selectors: + # When there were no selectors specified, just use all of them + extra_static_selectors = [ Selector.build( job_selector, data_spec_versions=extract_job.data_spec_versions, ) for job_selector in all_selectors - if dynamic_selector.is_match(job_selector) ] - extra_static_selectors.extend(dynamic_job_selectors) - logger.info(f"Added {len(dynamic_job_selectors)} selectors") + static_selectors = [] + else: + extra_static_selectors = [] + for dynamic_selector in dynamic_selectors: + dynamic_job_selectors = [ + Selector.build( + job_selector, + data_spec_versions=extract_job.data_spec_versions, + ) + for job_selector in all_selectors + if dynamic_selector.is_match(job_selector) + ] + extra_static_selectors.extend(dynamic_job_selectors) + logger.info(f"Added {len(dynamic_job_selectors)} selectors") static_selectors.extend(extra_static_selectors) + logger.info( f"Discovered {len(extra_static_selectors)} selectors from {extract_job.source.__class__.__name__}" ) else: - raise ConfigurationError( - f"Dynamic selectors cannot be used for " - f"{extract_job.source.__class__.__name__} because it doesn't support" - f" selector discovery" - ) + if not no_selectors: + # When there are no selectors and no discover_selectors, just pass it through. It might break + # later on + raise ConfigurationError( + f"Dynamic selectors cannot be used for " + f"{extract_job.source.__class__.__name__} because it doesn't support" + f" selector discovery" + ) # Merge selectors when source, dataset_type and actual selector is the same. This makes # sure there will be only 1 dataset for this combination @@ -204,7 +226,7 @@ def run_task(task): logger.info(f"Running task {task}") task.run() - task_executor = TaskExecutor() + task_executor = TaskExecutor(dry_run=dry_run) for extract_job, selector in selectors.values(): logger.debug( diff --git a/ingestify/cmdline.py b/ingestify/cmdline.py index f82a39e..b242f65 100644 --- a/ingestify/cmdline.py +++ b/ingestify/cmdline.py @@ -57,8 +57,16 @@ def cli(): help="bucket", type=str, ) -@click.option("--debug", "debug", required=False, help="Debugging enabled", type=bool) -def run(config_file: str, bucket: Optional[str], debug: Optional[bool]): +# @click.option("--debug", "debug", required=False, help="Debugging enabled", type=bool) +@click.option( + "--dry-run", + "dry_run", + required=False, + help="Dry run - don't store anything", + is_flag=True, + type=bool, +) +def run(config_file: str, bucket: Optional[str], dry_run: Optional[bool]): try: engine = get_engine(config_file, bucket) except ConfigurationError as e: @@ -68,7 +76,7 @@ def run(config_file: str, bucket: Optional[str], debug: Optional[bool]): logger.exception(f"Failed due a configuration error: {e}") sys.exit(1) - engine.load() + engine.load(dry_run=dry_run) logger.info("Done") diff --git a/ingestify/utils.py b/ingestify/utils.py index d5d4742..a1dfe32 100644 --- a/ingestify/utils.py +++ b/ingestify/utils.py @@ -1,5 +1,6 @@ import abc import inspect +import logging import os import time import re @@ -16,6 +17,9 @@ from itertools import islice +logger = logging.getLogger(__name__) + + def chunker(it, size): iterator = iter(it) while chunk := list(islice(iterator, size)): @@ -219,9 +223,23 @@ def close(self): return True +class DummyPool: + def map_async(self, func, iterable): + logger.info(f"DummyPool: not running {len(list(iterable))} tasks") + return None + + def join(self): + return True + + def close(self): + return True + + class TaskExecutor: - def __init__(self, processes=0): - if os.environ.get("INGESTIFY_RUN_EAGER") == "true": + def __init__(self, processes=0, dry_run: bool = False): + if dry_run: + pool = DummyPool() + elif os.environ.get("INGESTIFY_RUN_EAGER") == "true": pool = SyncPool() else: if not processes: