Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow to increase total count on most progress bars, fixes incorrect output in load stage #2100

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 76 additions & 8 deletions dlt/common/runtime/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ class Collector(ABC):

@abstractmethod
def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
"""Creates or updates a counter

Expand All @@ -48,6 +54,7 @@ def update(
name (str): An unique name of a counter, displayable.
inc (int, optional): Increase amount. Defaults to 1.
total (int, optional): Maximum value of a counter. Defaults to None which means unbound counter.
icn_total (int, optional): Increase the maximum value of the counter, does nothing if counter does not exit yet
message (str, optional): Additional message attached to a counter. Defaults to None.
label (str, optional): Creates nested counter for counter `name`. Defaults to None.
"""
Expand Down Expand Up @@ -80,7 +87,13 @@ class NullCollector(Collector):
"""A default counter that does not count anything."""

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
pass

Expand All @@ -98,7 +111,13 @@ def __init__(self) -> None:
self.counters: DefaultDict[str, int] = None

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
assert not label, "labels not supported in dict collector"
self.counters[name] += inc
Expand Down Expand Up @@ -158,7 +177,13 @@ def __init__(
self.last_log_time: float = None

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
counter_key = f"{name}_{label}" if label else name

Expand All @@ -171,6 +196,14 @@ def update(
)
self.messages[counter_key] = None
self.last_log_time = None
else:
counter_info = self.counter_info[counter_key]
if inc_total:
self.counter_info[counter_key] = LogCollector.CounterInfo(
description=counter_info.description,
start_time=counter_info.start_time,
total=counter_info.total + inc_total,
)

self.counters[counter_key] += inc
if message is not None:
Expand Down Expand Up @@ -264,7 +297,13 @@ def __init__(self, single_bar: bool = False, **tqdm_kwargs: Any) -> None:
self.tqdm_kwargs = tqdm_kwargs or {}

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = ""
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = "",
) -> None:
key = f"{name}_{label}"
bar = self._bars.get(key)
Expand All @@ -281,6 +320,10 @@ def update(
bar = tqdm(desc=desc, total=total, leave=False, **self.tqdm_kwargs)
bar.refresh()
self._bars[key] = bar
else:
if inc_total:
bar.total += inc_total
bar.refresh()
if message:
bar.set_postfix_str(message)
bar.update(inc)
Expand Down Expand Up @@ -312,11 +355,18 @@ def __init__(self, single_bar: bool = True, **alive_kwargs: Any) -> None:
)
self.single_bar = single_bar
self._bars: Dict[str, Any] = {}
self._bars_counts: Dict[str, int] = {}
self._bars_contexts: Dict[str, ContextManager[Any]] = {}
self.alive_kwargs = alive_kwargs or {}

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = ""
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = "",
) -> None:
key = f"{name}_{label}"
bar = self._bars.get(key)
Expand All @@ -333,19 +383,28 @@ def update(
bar = alive_bar(total=total, title=desc, **self.alive_kwargs)
self._bars_contexts[key] = bar
bar = self._bars[key] = bar.__enter__()
self._bars_counts[key] = 0
else:
# TODO: implement once total change is supported
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alive bars don't seem to support updating the total, you can create a new one and remember the old settings, but then a new bar appears in the ui, and the old half files one stays..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alive is just for fun. so OK :)

pass

# if message:
# bar.set_postfix_str(message)
bar(inc)
if inc > 0:
bar(inc)
self._bars_counts[key] += inc

def _start(self, step: str) -> None:
self._bars = {}
self._bars_contexts = {}
self

def _stop(self) -> None:
for bar in self._bars_contexts.values():
bar.__exit__(None, None, None)
self._bars.clear()
self._bars_contexts.clear()
self._bars_counts.clear()


class EnlightenCollector(Collector):
Expand Down Expand Up @@ -376,7 +435,13 @@ def __init__(self, single_bar: bool = False, **enlighten_kwargs: Any) -> None:
self.enlighten_kwargs = enlighten_kwargs

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = ""
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = "",
) -> None:
key = f"{name}_{label}"
bar = self._bars.get(key)
Expand All @@ -391,6 +456,9 @@ def update(
)
bar.refresh()
self._bars[key] = bar
else:
if inc_total:
bar.total = bar.total + inc_total
bar.update(inc)

def _start(self, step: str) -> None:
Expand Down
1 change: 1 addition & 0 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def create_followup_jobs(
f"Job {starting_job.job_id()} CREATED a new FOLLOWUP JOB"
f" {followup_job.new_file_path()} placed in new_jobs"
)
self.collector.update("Jobs", inc=0, inc_total=len(jobs))

def complete_jobs(
self, load_id: str, jobs: Sequence[LoadJob], schema: Schema
Expand Down
Loading