Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvo committed Sep 17, 2024
1 parent a48b1fe commit 0f993f2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 22 deletions.
4 changes: 2 additions & 2 deletions ingestify/application/ingestion_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def __init__(self, store: DatasetStore):
def add_extract_job(self, extract_job: ExtractJob):
self.loader.add_extract_job(extract_job)

def load(self):
self.loader.collect_and_run()
def load(self, dry_run: bool = False):
self.loader.collect_and_run(dry_run=dry_run)

def list_datasets(self, as_count: bool = False):
"""Consider moving this to DataStore"""
Expand Down
52 changes: 37 additions & 15 deletions ingestify/application/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ def to_batches(input_):
batches = [input_]
else:
# Assume it's an iterator. Peek what's inside, and put it back
peek = next(input_)
try:
peek = next(input_)
except StopIteration:
# Nothing to batch
return []

input_ = itertools.chain([peek], input_)

if not isinstance(peek, list):
Expand Down Expand Up @@ -140,7 +145,7 @@ def __init__(self, store: DatasetStore):
def add_extract_job(self, extract_job: ExtractJob):
self.extract_jobs.append(extract_job)

def collect_and_run(self):
def collect_and_run(self, dry_run: bool = False):
total_dataset_count = 0

# First collect all selectors, before discovering datasets
Expand All @@ -154,7 +159,9 @@ def collect_and_run(self):
dynamic_selectors = [
selector for selector in extract_job.selectors if selector.is_dynamic
]
if dynamic_selectors:

no_selectors = len(static_selectors) == 1 and not bool(static_selectors[0])
if dynamic_selectors or no_selectors:
if hasattr(extract_job.source, "discover_selectors"):
logger.debug(
f"Discovering selectors from {extract_job.source.__class__.__name__}"
Expand All @@ -165,29 +172,44 @@ def collect_and_run(self):
all_selectors = extract_job.source.discover_selectors(
extract_job.dataset_type
)
extra_static_selectors = []
for dynamic_selector in dynamic_selectors:
dynamic_job_selectors = [
if no_selectors:
# When there were no selectors specified, just use all of them
extra_static_selectors = [
Selector.build(
job_selector,
data_spec_versions=extract_job.data_spec_versions,
)
for job_selector in all_selectors
if dynamic_selector.is_match(job_selector)
]
extra_static_selectors.extend(dynamic_job_selectors)
logger.info(f"Added {len(dynamic_job_selectors)} selectors")
static_selectors = []
else:
extra_static_selectors = []
for dynamic_selector in dynamic_selectors:
dynamic_job_selectors = [
Selector.build(
job_selector,
data_spec_versions=extract_job.data_spec_versions,
)
for job_selector in all_selectors
if dynamic_selector.is_match(job_selector)
]
extra_static_selectors.extend(dynamic_job_selectors)
logger.info(f"Added {len(dynamic_job_selectors)} selectors")

static_selectors.extend(extra_static_selectors)

logger.info(
f"Discovered {len(extra_static_selectors)} selectors from {extract_job.source.__class__.__name__}"
)
else:
raise ConfigurationError(
f"Dynamic selectors cannot be used for "
f"{extract_job.source.__class__.__name__} because it doesn't support"
f" selector discovery"
)
if not no_selectors:
# When there are no selectors and no discover_selectors, just pass it through. It might break
# later on
raise ConfigurationError(
f"Dynamic selectors cannot be used for "
f"{extract_job.source.__class__.__name__} because it doesn't support"
f" selector discovery"
)

# Merge selectors when source, dataset_type and actual selector is the same. This makes
# sure there will be only 1 dataset for this combination
Expand All @@ -204,7 +226,7 @@ def run_task(task):
logger.info(f"Running task {task}")
task.run()

task_executor = TaskExecutor()
task_executor = TaskExecutor(dry_run=dry_run)

for extract_job, selector in selectors.values():
logger.debug(
Expand Down
14 changes: 11 additions & 3 deletions ingestify/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,16 @@ def cli():
help="bucket",
type=str,
)
@click.option("--debug", "debug", required=False, help="Debugging enabled", type=bool)
def run(config_file: str, bucket: Optional[str], debug: Optional[bool]):
# @click.option("--debug", "debug", required=False, help="Debugging enabled", type=bool)
@click.option(
"--dry-run",
"dry_run",
required=False,
help="Dry run - don't store anything",
is_flag=True,
type=bool,
)
def run(config_file: str, bucket: Optional[str], dry_run: Optional[bool]):
try:
engine = get_engine(config_file, bucket)
except ConfigurationError as e:
Expand All @@ -68,7 +76,7 @@ def run(config_file: str, bucket: Optional[str], debug: Optional[bool]):
logger.exception(f"Failed due a configuration error: {e}")
sys.exit(1)

engine.load()
engine.load(dry_run=dry_run)

logger.info("Done")

Expand Down
22 changes: 20 additions & 2 deletions ingestify/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import inspect
import logging
import os
import time
import re
Expand All @@ -16,6 +17,9 @@
from itertools import islice


logger = logging.getLogger(__name__)


def chunker(it, size):
iterator = iter(it)
while chunk := list(islice(iterator, size)):
Expand Down Expand Up @@ -219,9 +223,23 @@ def close(self):
return True


class DummyPool:
def map_async(self, func, iterable):
logger.info(f"DummyPool: not running {len(list(iterable))} tasks")
return None

def join(self):
return True

def close(self):
return True


class TaskExecutor:
def __init__(self, processes=0):
if os.environ.get("INGESTIFY_RUN_EAGER") == "true":
def __init__(self, processes=0, dry_run: bool = False):
if dry_run:
pool = DummyPool()
elif os.environ.get("INGESTIFY_RUN_EAGER") == "true":
pool = SyncPool()
else:
if not processes:
Expand Down

0 comments on commit 0f993f2

Please sign in to comment.