Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor logging #50

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lea/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def validate_views_dir(views_dir: str):
def prepare(views_dir: str = ViewsDir, production: bool = False, env: str = EnvPath):
client = _make_client(production, wap_mode=False)
runner = lea.Runner(views_dir=views_dir, client=client, verbose=True)
runner.log(repr(runner.client))
runner.prepare()


Expand Down Expand Up @@ -71,6 +72,7 @@ def run(
):
client = _make_client(production, wap_mode=wap)
runner = lea.Runner(views_dir=views_dir, client=client, verbose=not silent and not print)
runner.log(repr(runner.client))
runner.run(
select=select,
freeze_unselected=freeze_unselected,
Expand All @@ -95,6 +97,7 @@ def test(
):
client = _make_client(production)
runner = lea.Runner(views_dir=views_dir, client=client, verbose=True)
runner.log(repr(runner.client))
runner.test(
select_views=select_views,
freeze_unselected=freeze_unselected,
Expand Down
9 changes: 9 additions & 0 deletions lea/clients/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ def __init__(
self.username = username
self.wap_mode = wap_mode

def __repr__(self):
return (
"Running on BigQuery\n"
f"{self.dataset_name=}\n"
f"{self.location=}\n"
f"{self.write_project_id=}\n"
f"{self.compute_project_id=}"
).replace("self.", "")

@property
def dataset_name(self):
return f"{self._dataset_name}_{self.username}" if self.username else self._dataset_name
Expand Down
3 changes: 3 additions & 0 deletions lea/clients/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def __init__(self, path: str, username: str | None = None, wap_mode: bool = Fals
path_ = path_.parent / f"{path_.stem}_{username}{path_.suffix}"
self.path_ = path_

def __repr__(self):
return ("Running on DuckDB\n" f"{self.path=}\n" f"{self.username=}").replace("self.", "")

def _make_con(self):
import duckdb

Expand Down
257 changes: 120 additions & 137 deletions lea/runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import concurrent.futures
import dataclasses
import datetime as dt
import functools
import io
import pathlib
import pickle
import re
import sys
import time
import warnings

Expand All @@ -20,10 +22,11 @@

console = rich.console.Console(force_interactive=True)

RUNNING = "[cyan]RUNNING"
RUNNING = "[white]RUNNING"
SUCCESS = "[green]SUCCESS"
ERRORED = "[red]ERRORED"
SKIPPED = "[yellow]SKIPPED"
STOPPED = "[cyan]STOPPED"


def _do_nothing(*args, **kwargs):
Expand All @@ -38,6 +41,30 @@ def sizeof_fmt(num, suffix="B"):
return f"{num:.1f}Yi{suffix}"


@dataclasses.dataclass
class RefreshJob:
future: concurrent.futures.Future
view: lea.views.View
started_at: dt.datetime = dataclasses.field(default_factory=dt.datetime.now)
finished_at: dt.datetime | None = None
skipped: bool = False
stopped: bool = False

@property
def status(self):
if self.stopped:
return STOPPED
if self.skipped:
return SKIPPED
if self.future.done():
return SUCCESS if self.future.exception() is None else ERRORED
return RUNNING

@property
def error(self) -> BaseException | None:
return self.future.exception()


class Runner:
def __init__(self, views_dir: pathlib.Path | str, client: lea.clients.Client, verbose=False):
self.views_dir = pathlib.Path(views_dir) if isinstance(views_dir, str) else views_dir
Expand Down Expand Up @@ -238,6 +265,21 @@ def _make_table_reference_mapping(
def prepare(self):
self.client.prepare(self.regular_views.values())

def _make_job(
self,
view: lea.views.View,
executor: concurrent.futures.Executor,
dry: bool,
print_views: bool,
) -> RefreshJob:
if dry:
func = _do_nothing
elif print_views:
func = functools.partial(console.print, view)
else:
func = functools.partial(self.client.materialize_view, view=view)
return RefreshJob(future=executor.submit(func), view=view)

def run(
self,
select: list[str],
Expand Down Expand Up @@ -272,133 +314,84 @@ def run(
self.client.delete_table_reference(existing_view_keys[view_key])
self.log(f"Removed {'.'.join(view_key)}")

def display_progress() -> rich.table.Table | None:
if not self.verbose:
return None
table = rich.table.Table(box=None)
table.add_column("#")
table.add_column("view")
table.add_column("status")
table.add_column("duration", justify="right")
table.add_column("cost", justify="right")

def get_status(view_key):
if view_key in exceptions:
return ERRORED
elif view_key in skipped:
return SKIPPED
elif view_key in jobs_ended_at:
return SUCCESS
return RUNNING

not_done = [view_key for view_key in execution_order if view_key not in cache]
statuses = {view_key: get_status(view_key) for view_key in not_done}
not_done = [view_key for view_key in not_done if statuses[view_key] != RUNNING] + [
view_key for view_key in not_done if statuses[view_key] == RUNNING
]
for i, view_key in list(enumerate(not_done, start=1))[-show:]:
status = statuses[view_key]
duration = (
(jobs_ended_at.get(view_key, dt.datetime.now()) - jobs_started_at[view_key])
if view_key in jobs_started_at
else None
)
# Round to the closest second
duration_str = f"{int(round(duration.total_seconds()))}s" if duration else ""
result = jobs[view_key].result() if status == SUCCESS else None
cost = result.cost if result else None
table.add_row(
str(i) if status != RUNNING else "",
str(self.views[view_key]),
status,
duration_str,
"" if cost is None else f"${cost:,.5f}",
)

return table

executor = concurrent.futures.ThreadPoolExecutor(max_workers=threads)
jobs = {}
execution_order = []
jobs_started_at = {}
jobs_ended_at = {}
exceptions = {}
skipped = set()
jobs: dict[tuple[str, ...], RefreshJob] = {}
cache_path = pathlib.Path(".cache.pkl")
cache = set() if fresh or not cache_path.exists() else pickle.loads(cache_path.read_bytes())
stop = False
tic = time.time()

if cache:
self.log(f"{len(cache):,d} views already done")

with rich.live.Live(display_progress(), vertical_overflow="ellipsis") as live:
while self.dag.is_active():
for view_key in self.dag.get_ready():
# Check if the view_key can be skipped or not
if view_key not in selected_view_keys:
self.dag.done(view_key)
continue
execution_order.append(view_key)

# A view can only be computed if all its dependencies have been computed
# succesfully

if any(
dep_key in skipped or dep_key in exceptions
for dep_key in self.views[view_key].dependent_view_keys
):
skipped.add(view_key)
self.dag.done(view_key)
continue

# Submit a job, or print, or do nothing
if dry or view_key in cache:
job = _do_nothing
elif print_views:
job = functools.partial(
console.print,
self.views[view_key].with_context(
table_reference_mapping=table_reference_mapping
),
)
else:
job = functools.partial(
self.client.materialize_view,
view=self.views[view_key].with_context(
table_reference_mapping=table_reference_mapping
),
)
jobs[view_key] = executor.submit(job)
jobs_started_at[view_key] = dt.datetime.now()

# Check if any jobs are done. We notify the DAG by calling done when a job is done,
# which will unlock the next views.
for view_key in jobs_started_at:
if view_key not in jobs_ended_at and jobs[view_key].done():
self.dag.done(view_key)
jobs_ended_at[view_key] = dt.datetime.now()
# Determine whether the job succeeded or not
if exception := jobs[view_key].exception():
exceptions[view_key] = exception
if fail_fast:
raise RuntimeError(
f"Error in {self.views[view_key]}"
) from exception

if (progress := display_progress()) is not None:
live.update(progress)
while self.dag.is_active():
if stop:
for job in jobs.values():
if job.status == RUNNING:
job.future.cancel()
job.stopped = True
job.finished_at = dt.datetime.now()
console.log(f"{STOPPED} {job.view}")
break

# Start new jobs
for view_key in self.dag.get_ready():
if view_key in jobs and jobs[view_key].finished_at is not None:
continue

# Check if the view_key can be skipped or not
if view_key not in selected_view_keys:
self.dag.done(view_key)
continue

# We can't refresh a view if its dependencies had errors (or were skipped
# because their dependencies had errors)
if any(
jobs[dep_key].status in {ERRORED, SKIPPED}
for dep_key in self.views[view_key].dependent_view_keys
):
self.dag.done(view_key)
console.log(f"{SKIPPED} {self.views[view_key]}")
continue

# Submit a job, or print, or do nothing
job = self._make_job(
view=self.views[view_key].with_context(
table_reference_mapping=table_reference_mapping
),
executor=executor,
dry=dry or view_key in cache,
print_views=print_views,
)
jobs[view_key] = job
console.log(f"{RUNNING} {job.view}")

# Check if any jobs are done
unfinished_jobs = (job for job in jobs.values() if job.finished_at is None)
for job in unfinished_jobs:
if job.status == RUNNING:
if (duration := dt.datetime.now() - job.started_at) > dt.timedelta(seconds=10):
duration_str = f"{round(duration.total_seconds(), 1)}s"
console.log(f"{RUNNING} {job.view} after {duration_str}")
continue
job.finished_at = dt.datetime.now()
self.dag.done(job.view.key)
if job.status == ERRORED:
console.log(f"{ERRORED} {job.view}", style="red")
if fail_fast:
stop = True
break
if job.status == SUCCESS:
duration = job.finished_at - job.started_at
duration_str = f"{round(duration.total_seconds(), 1)}s"
console.log(f"{SUCCESS} {job.view} in {duration_str}")

# Save the cache
all_done = not exceptions and not skipped
all_jobs_succeeded = all(job.status == SUCCESS for job in jobs.values())
cache = (
set()
if all_done
else cache
| {
view_key
for view_key in execution_order
if view_key not in exceptions and view_key not in skipped
}
if all_jobs_succeeded
else cache | {view_key for view_key, job in jobs.items() if job.status == SUCCESS}
)
if cache:
cache_path.write_bytes(pickle.dumps(cache))
Expand All @@ -408,30 +401,21 @@ def get_status(view_key):
# Summary statistics
if not self.verbose:
return
self.log(f"Took {round(time.time() - tic)}s")
summary = rich.table.Table()
summary.add_column("status")
summary.add_column("count")
if n := len(jobs_ended_at) - len(exceptions):
summary.add_row(SUCCESS, f"{n:,d}")
if n := len(exceptions):
summary.add_row(ERRORED, f"{n:,d}")
if n := len(skipped):
summary.add_row(SKIPPED, f"{n:,d}")
self.print(summary)
self.log(f"Finished in {round(time.time() - tic, 1)}s")

# Summary of errors
if exceptions:
for view_key, exception in exceptions.items():
self.print(str(self.views[view_key]), style="bold red")
self.print(exception)

if fail_fast:
raise Exception("Some views failed to build")
at_least_one_error = False
for job in jobs.values():
if job.error:
at_least_one_error = True
self.print(str(job.view), style="bold red")
self.print(job.error)
if at_least_one_error:
return sys.exit(1)

# In WAP mode, the tables gets created with a suffix to mimic a staging environment. We
# need to switch the tables to the production environment.
if self.client.wap_mode and not exceptions and not dry:
if self.client.wap_mode and not all_jobs_succeeded and not dry:
# In WAP mode, we want to guarantee the new tables are correct. Therefore, we run tests
# on them before switching.
self.test(
Expand Down Expand Up @@ -508,8 +492,7 @@ def test(self, select_views: list[str], freeze_unselected: bool, threads: int, f
self.log(f"FAILURE {test}", style="bold red")
self.log(conflicts.head())
if fail_fast:
# TODO: print out the query to help quick debugging
raise RuntimeError(f"Test {test} failed")
return sys.exit(1)

def make_docs(self, output_dir: str):
output_dir_path = pathlib.Path(output_dir)
Expand Down
Loading