From 3ddcdb49b6bfa012f50a05195738ec0a5befc398 Mon Sep 17 00:00:00 2001 From: Max Halford Date: Fri, 4 Oct 2024 14:58:08 +0200 Subject: [PATCH 1/2] Refactor logging --- lea/cli.py | 3 + lea/clients/bigquery.py | 9 ++ lea/clients/duckdb.py | 7 ++ lea/runner.py | 265 ++++++++++++++++++++-------------------- 4 files changed, 151 insertions(+), 133 deletions(-) diff --git a/lea/cli.py b/lea/cli.py index cc3d9b4..b9d9367 100644 --- a/lea/cli.py +++ b/lea/cli.py @@ -36,6 +36,7 @@ def validate_views_dir(views_dir: str): def prepare(views_dir: str = ViewsDir, production: bool = False, env: str = EnvPath): client = _make_client(production, wap_mode=False) runner = lea.Runner(views_dir=views_dir, client=client, verbose=True) + runner.log(repr(runner.client)) runner.prepare() @@ -71,6 +72,7 @@ def run( ): client = _make_client(production, wap_mode=wap) runner = lea.Runner(views_dir=views_dir, client=client, verbose=not silent and not print) + runner.log(repr(runner.client)) runner.run( select=select, freeze_unselected=freeze_unselected, @@ -95,6 +97,7 @@ def test( ): client = _make_client(production) runner = lea.Runner(views_dir=views_dir, client=client, verbose=True) + runner.log(repr(runner.client)) runner.test( select_views=select_views, freeze_unselected=freeze_unselected, diff --git a/lea/clients/bigquery.py b/lea/clients/bigquery.py index 21d6ce0..69d58fa 100644 --- a/lea/clients/bigquery.py +++ b/lea/clients/bigquery.py @@ -35,6 +35,15 @@ def __init__( self.username = username self.wap_mode = wap_mode + def __repr__(self): + return ( + "Running on BigQuery\n" + f"{self.dataset_name=}\n" + f"{self.location=}\n" + f"{self.write_project_id=}\n" + f"{self.compute_project_id=}" + ).replace("self.", "") + @property def dataset_name(self): return f"{self._dataset_name}_{self.username}" if self.username else self._dataset_name diff --git a/lea/clients/duckdb.py b/lea/clients/duckdb.py index eb4b266..9704319 100644 --- a/lea/clients/duckdb.py +++ b/lea/clients/duckdb.py @@ -30,6 +30,13 @@ def __init__(self, path: str, username: str | None = None, wap_mode: bool = Fals path_ = path_.parent / f"{path_.stem}_{username}{path_.suffix}" self.path_ = path_ + def __repr__(self): + return ( + "Running on DuckDB\n" + f"{self.path=}\n" + f"{self.username=}" + ).replace("self.", "") + def _make_con(self): import duckdb diff --git a/lea/runner.py b/lea/runner.py index abb30ee..fcc25c8 100644 --- a/lea/runner.py +++ b/lea/runner.py @@ -1,12 +1,14 @@ from __future__ import annotations import concurrent.futures +import dataclasses import datetime as dt import functools import io import pathlib import pickle import re +import sys import time import warnings @@ -20,10 +22,11 @@ console = rich.console.Console(force_interactive=True) -RUNNING = "[cyan]RUNNING" +RUNNING = "[white]RUNNING" SUCCESS = "[green]SUCCESS" ERRORED = "[red]ERRORED" SKIPPED = "[yellow]SKIPPED" +STOPPED = "[cyan]STOPPED" def _do_nothing(*args, **kwargs): @@ -38,6 +41,30 @@ def sizeof_fmt(num, suffix="B"): return f"{num:.1f}Yi{suffix}" +@dataclasses.dataclass +class RefreshJob: + future: concurrent.futures.Future + view: lea.views.View + started_at: dt.datetime = dataclasses.field(default_factory=dt.datetime.now) + finished_at: dt.datetime | None = None + skipped: bool = False + cancelled: bool = False + + @property + def status(self): + if self.cancelled: + return CANCELLED + if self.skipped: + return SKIPPED + if self.future.done(): + return SUCCESS if self.future.exception() is None else ERRORED + return RUNNING + + @property + def error(self) -> BaseException | None: + return self.future.exception() + + class Runner: def __init__(self, views_dir: pathlib.Path | str, client: lea.clients.Client, verbose=False): self.views_dir = pathlib.Path(views_dir) if isinstance(views_dir, str) else views_dir @@ -238,6 +265,30 @@ def _make_table_reference_mapping( def prepare(self): self.client.prepare(self.regular_views.values()) + def _make_job( + self, + view: lea.views.View, + executor: concurrent.futures.Executor, + dry: bool, + print_views: bool, + ) -> RefreshJob: + if dry: + func = _do_nothing + elif print_views: + func = functools.partial( + console.print, + view + ) + else: + func = functools.partial( + self.client.materialize_view, + view=view + ) + return RefreshJob( + future=executor.submit(func), + view=view + ) + def run( self, select: list[str], @@ -272,132 +323,90 @@ def run( self.client.delete_table_reference(existing_view_keys[view_key]) self.log(f"Removed {'.'.join(view_key)}") - def display_progress() -> rich.table.Table | None: - if not self.verbose: - return None - table = rich.table.Table(box=None) - table.add_column("#") - table.add_column("view") - table.add_column("status") - table.add_column("duration", justify="right") - table.add_column("cost", justify="right") - - def get_status(view_key): - if view_key in exceptions: - return ERRORED - elif view_key in skipped: - return SKIPPED - elif view_key in jobs_ended_at: - return SUCCESS - return RUNNING - - not_done = [view_key for view_key in execution_order if view_key not in cache] - statuses = {view_key: get_status(view_key) for view_key in not_done} - not_done = [view_key for view_key in not_done if statuses[view_key] != RUNNING] + [ - view_key for view_key in not_done if statuses[view_key] == RUNNING - ] - for i, view_key in list(enumerate(not_done, start=1))[-show:]: - status = statuses[view_key] - duration = ( - (jobs_ended_at.get(view_key, dt.datetime.now()) - jobs_started_at[view_key]) - if view_key in jobs_started_at - else None - ) - # Round to the closest second - duration_str = f"{int(round(duration.total_seconds()))}s" if duration else "" - result = jobs[view_key].result() if status == SUCCESS else None - cost = result.cost if result else None - table.add_row( - str(i) if status != RUNNING else "", - str(self.views[view_key]), - status, - duration_str, - "" if cost is None else f"${cost:,.5f}", - ) - - return table - executor = concurrent.futures.ThreadPoolExecutor(max_workers=threads) - jobs = {} - execution_order = [] - jobs_started_at = {} - jobs_ended_at = {} - exceptions = {} - skipped = set() + jobs: dict[tuple[str, ...], RefreshJob] = {} cache_path = pathlib.Path(".cache.pkl") cache = set() if fresh or not cache_path.exists() else pickle.loads(cache_path.read_bytes()) + stop = False tic = time.time() if cache: self.log(f"{len(cache):,d} views already done") - with rich.live.Live(display_progress(), vertical_overflow="ellipsis") as live: - while self.dag.is_active(): - for view_key in self.dag.get_ready(): - # Check if the view_key can be skipped or not - if view_key not in selected_view_keys: - self.dag.done(view_key) - continue - execution_order.append(view_key) - - # A view can only be computed if all its dependencies have been computed - # succesfully - - if any( - dep_key in skipped or dep_key in exceptions - for dep_key in self.views[view_key].dependent_view_keys - ): - skipped.add(view_key) - self.dag.done(view_key) - continue - - # Submit a job, or print, or do nothing - if dry or view_key in cache: - job = _do_nothing - elif print_views: - job = functools.partial( - console.print, - self.views[view_key].with_context( - table_reference_mapping=table_reference_mapping - ), - ) - else: - job = functools.partial( - self.client.materialize_view, - view=self.views[view_key].with_context( - table_reference_mapping=table_reference_mapping - ), - ) - jobs[view_key] = executor.submit(job) - jobs_started_at[view_key] = dt.datetime.now() - - # Check if any jobs are done. We notify the DAG by calling done when a job is done, - # which will unlock the next views. - for view_key in jobs_started_at: - if view_key not in jobs_ended_at and jobs[view_key].done(): - self.dag.done(view_key) - jobs_ended_at[view_key] = dt.datetime.now() - # Determine whether the job succeeded or not - if exception := jobs[view_key].exception(): - exceptions[view_key] = exception - if fail_fast: - raise RuntimeError( - f"Error in {self.views[view_key]}" - ) from exception - - if (progress := display_progress()) is not None: - live.update(progress) + while self.dag.is_active(): + + if stop: + for job in jobs.values(): + if job.status == RUNNING: + job.future.cancel() + job.cancelled = True + job.finished_at = dt.datetime.now() + console.log(f"{STOPPED} {job.view}") + break + + # Start new jobs + for view_key in self.dag.get_ready(): + + if view_key in jobs and jobs[view_key].finished_at is not None: + continue + + # Check if the view_key can be skipped or not + if view_key not in selected_view_keys: + self.dag.done(view_key) + continue + + # We can't refresh a view if its dependencies had errors (or were skipped + # because their dependencies had errors) + if any( + jobs[dep_key].status in {ERRORED, SKIPPED} + for dep_key in self.views[view_key].dependent_view_keys + ): + self.dag.done(view_key) + console.log(f"{SKIPPED} {self.views[view_key]}") + continue + + # Submit a job, or print, or do nothing + job = self._make_job( + view=self.views[view_key].with_context( + table_reference_mapping=table_reference_mapping + ), + executor=executor, + dry=dry or view_key in cache, + print_views=print_views, + ) + jobs[view_key] = job + console.log(f"{RUNNING} {job.view}") + + # Check if any jobs are done + unfinished_jobs = (job for job in jobs.values() if job.finished_at is None) + for job in unfinished_jobs: + if job.status == RUNNING: + if (duration := dt.datetime.now() - job.started_at) > dt.timedelta(seconds=10): + duration_str = f"{round(duration.total_seconds(), 1)}s" + console.log(f"{RUNNING} {job.view} after {duration_str}") + continue + job.finished_at = dt.datetime.now() + self.dag.done(job.view.key) + if job.status == ERRORED: + console.log(f"{ERRORED} {job.view}", style="red") + if fail_fast: + stop = True + break + if job.status == SUCCESS: + duration = (job.finished_at - job.started_at) + duration_str = f"{round(duration.total_seconds(), 1)}s" + console.log(f"{SUCCESS} {job.view} in {duration_str}") # Save the cache - all_done = not exceptions and not skipped + all_jobs_succeeded = all(job.status == SUCCESS for job in jobs.values()) cache = ( set() - if all_done + if all_jobs_succeeded else cache | { view_key - for view_key in execution_order - if view_key not in exceptions and view_key not in skipped + for view_key, job in jobs.items() + if job.status == SUCCESS } ) if cache: @@ -408,30 +417,21 @@ def get_status(view_key): # Summary statistics if not self.verbose: return - self.log(f"Took {round(time.time() - tic)}s") - summary = rich.table.Table() - summary.add_column("status") - summary.add_column("count") - if n := len(jobs_ended_at) - len(exceptions): - summary.add_row(SUCCESS, f"{n:,d}") - if n := len(exceptions): - summary.add_row(ERRORED, f"{n:,d}") - if n := len(skipped): - summary.add_row(SKIPPED, f"{n:,d}") - self.print(summary) + self.log(f"Finished in {round(time.time() - tic, 1)}s") # Summary of errors - if exceptions: - for view_key, exception in exceptions.items(): - self.print(str(self.views[view_key]), style="bold red") - self.print(exception) - - if fail_fast: - raise Exception("Some views failed to build") + at_least_one_error = False + for job in jobs.values(): + if job.error: + at_least_one_error = True + self.print(str(job.view), style="bold red") + self.print(job.error) + if at_least_one_error: + return sys.exit(1) # In WAP mode, the tables gets created with a suffix to mimic a staging environment. We # need to switch the tables to the production environment. - if self.client.wap_mode and not exceptions and not dry: + if self.client.wap_mode and not all_jobs_succeeded and not dry: # In WAP mode, we want to guarantee the new tables are correct. Therefore, we run tests # on them before switching. self.test( @@ -508,8 +508,7 @@ def test(self, select_views: list[str], freeze_unselected: bool, threads: int, f self.log(f"FAILURE {test}", style="bold red") self.log(conflicts.head()) if fail_fast: - # TODO: print out the query to help quick debugging - raise RuntimeError(f"Test {test} failed") + return sys.exit(1) def make_docs(self, output_dir: str): output_dir_path = pathlib.Path(output_dir) From dad72784e075ce4af13873adb2aeea016849c534 Mon Sep 17 00:00:00 2001 From: Max Halford Date: Fri, 4 Oct 2024 14:59:16 +0200 Subject: [PATCH 2/2] ruff --- lea/clients/duckdb.py | 6 +----- lea/runner.py | 34 +++++++++------------------------- 2 files changed, 10 insertions(+), 30 deletions(-) diff --git a/lea/clients/duckdb.py b/lea/clients/duckdb.py index 9704319..c839787 100644 --- a/lea/clients/duckdb.py +++ b/lea/clients/duckdb.py @@ -31,11 +31,7 @@ def __init__(self, path: str, username: str | None = None, wap_mode: bool = Fals self.path_ = path_ def __repr__(self): - return ( - "Running on DuckDB\n" - f"{self.path=}\n" - f"{self.username=}" - ).replace("self.", "") + return ("Running on DuckDB\n" f"{self.path=}\n" f"{self.username=}").replace("self.", "") def _make_con(self): import duckdb diff --git a/lea/runner.py b/lea/runner.py index fcc25c8..2bf47ca 100644 --- a/lea/runner.py +++ b/lea/runner.py @@ -48,12 +48,12 @@ class RefreshJob: started_at: dt.datetime = dataclasses.field(default_factory=dt.datetime.now) finished_at: dt.datetime | None = None skipped: bool = False - cancelled: bool = False + stopped: bool = False @property def status(self): - if self.cancelled: - return CANCELLED + if self.stopped: + return STOPPED if self.skipped: return SKIPPED if self.future.done(): @@ -275,19 +275,10 @@ def _make_job( if dry: func = _do_nothing elif print_views: - func = functools.partial( - console.print, - view - ) + func = functools.partial(console.print, view) else: - func = functools.partial( - self.client.materialize_view, - view=view - ) - return RefreshJob( - future=executor.submit(func), - view=view - ) + func = functools.partial(self.client.materialize_view, view=view) + return RefreshJob(future=executor.submit(func), view=view) def run( self, @@ -334,19 +325,17 @@ def run( self.log(f"{len(cache):,d} views already done") while self.dag.is_active(): - if stop: for job in jobs.values(): if job.status == RUNNING: job.future.cancel() - job.cancelled = True + job.stopped = True job.finished_at = dt.datetime.now() console.log(f"{STOPPED} {job.view}") break # Start new jobs for view_key in self.dag.get_ready(): - if view_key in jobs and jobs[view_key].finished_at is not None: continue @@ -393,7 +382,7 @@ def run( stop = True break if job.status == SUCCESS: - duration = (job.finished_at - job.started_at) + duration = job.finished_at - job.started_at duration_str = f"{round(duration.total_seconds(), 1)}s" console.log(f"{SUCCESS} {job.view} in {duration_str}") @@ -402,12 +391,7 @@ def run( cache = ( set() if all_jobs_succeeded - else cache - | { - view_key - for view_key, job in jobs.items() - if job.status == SUCCESS - } + else cache | {view_key for view_key, job in jobs.items() if job.status == SUCCESS} ) if cache: cache_path.write_bytes(pickle.dumps(cache))