From 4ba5eae94d38f1882b0aeada743e9928cc8e5b5f Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 27 Nov 2024 11:12:52 +0100 Subject: [PATCH 1/2] fix error in alive bar --- dlt/common/runtime/collector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dlt/common/runtime/collector.py b/dlt/common/runtime/collector.py index be5453cdd3..1429d58f18 100644 --- a/dlt/common/runtime/collector.py +++ b/dlt/common/runtime/collector.py @@ -335,7 +335,8 @@ def update( bar = self._bars[key] = bar.__enter__() # if message: # bar.set_postfix_str(message) - bar(inc) + if inc > 0: + bar(inc) def _start(self, step: str) -> None: self._bars = {} From 1f5787e222bd1a6d7cee6c0ec5ba56170ebe5b45 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 27 Nov 2024 12:54:45 +0100 Subject: [PATCH 2/2] allow to increate totals on counters --- dlt/common/runtime/collector.py | 81 ++++++++++++++++++++++++++++++--- dlt/load/load.py | 1 + 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/dlt/common/runtime/collector.py b/dlt/common/runtime/collector.py index 1429d58f18..8504334281 100644 --- a/dlt/common/runtime/collector.py +++ b/dlt/common/runtime/collector.py @@ -37,7 +37,13 @@ class Collector(ABC): @abstractmethod def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = None, ) -> None: """Creates or updates a counter @@ -48,6 +54,7 @@ def update( name (str): An unique name of a counter, displayable. inc (int, optional): Increase amount. Defaults to 1. total (int, optional): Maximum value of a counter. Defaults to None which means unbound counter. + icn_total (int, optional): Increase the maximum value of the counter, does nothing if counter does not exit yet message (str, optional): Additional message attached to a counter. Defaults to None. label (str, optional): Creates nested counter for counter `name`. Defaults to None. """ @@ -80,7 +87,13 @@ class NullCollector(Collector): """A default counter that does not count anything.""" def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = None, ) -> None: pass @@ -98,7 +111,13 @@ def __init__(self) -> None: self.counters: DefaultDict[str, int] = None def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = None, ) -> None: assert not label, "labels not supported in dict collector" self.counters[name] += inc @@ -158,7 +177,13 @@ def __init__( self.last_log_time: float = None def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = None, ) -> None: counter_key = f"{name}_{label}" if label else name @@ -171,6 +196,14 @@ def update( ) self.messages[counter_key] = None self.last_log_time = None + else: + counter_info = self.counter_info[counter_key] + if inc_total: + self.counter_info[counter_key] = LogCollector.CounterInfo( + description=counter_info.description, + start_time=counter_info.start_time, + total=counter_info.total + inc_total, + ) self.counters[counter_key] += inc if message is not None: @@ -264,7 +297,13 @@ def __init__(self, single_bar: bool = False, **tqdm_kwargs: Any) -> None: self.tqdm_kwargs = tqdm_kwargs or {} def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = "" + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = "", ) -> None: key = f"{name}_{label}" bar = self._bars.get(key) @@ -281,6 +320,10 @@ def update( bar = tqdm(desc=desc, total=total, leave=False, **self.tqdm_kwargs) bar.refresh() self._bars[key] = bar + else: + if inc_total: + bar.total += inc_total + bar.refresh() if message: bar.set_postfix_str(message) bar.update(inc) @@ -312,11 +355,18 @@ def __init__(self, single_bar: bool = True, **alive_kwargs: Any) -> None: ) self.single_bar = single_bar self._bars: Dict[str, Any] = {} + self._bars_counts: Dict[str, int] = {} self._bars_contexts: Dict[str, ContextManager[Any]] = {} self.alive_kwargs = alive_kwargs or {} def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = "" + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = "", ) -> None: key = f"{name}_{label}" bar = self._bars.get(key) @@ -333,20 +383,28 @@ def update( bar = alive_bar(total=total, title=desc, **self.alive_kwargs) self._bars_contexts[key] = bar bar = self._bars[key] = bar.__enter__() + self._bars_counts[key] = 0 + else: + # TODO: implement once total change is supported + pass + # if message: # bar.set_postfix_str(message) if inc > 0: bar(inc) + self._bars_counts[key] += inc def _start(self, step: str) -> None: self._bars = {} self._bars_contexts = {} + self def _stop(self) -> None: for bar in self._bars_contexts.values(): bar.__exit__(None, None, None) self._bars.clear() self._bars_contexts.clear() + self._bars_counts.clear() class EnlightenCollector(Collector): @@ -377,7 +435,13 @@ def __init__(self, single_bar: bool = False, **enlighten_kwargs: Any) -> None: self.enlighten_kwargs = enlighten_kwargs def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = "" + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = "", ) -> None: key = f"{name}_{label}" bar = self._bars.get(key) @@ -392,6 +456,9 @@ def update( ) bar.refresh() self._bars[key] = bar + else: + if inc_total: + bar.total = bar.total + inc_total bar.update(inc) def _start(self, step: str) -> None: diff --git a/dlt/load/load.py b/dlt/load/load.py index 060b2c5d8e..ddbc7193ed 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -370,6 +370,7 @@ def create_followup_jobs( f"Job {starting_job.job_id()} CREATED a new FOLLOWUP JOB" f" {followup_job.new_file_path()} placed in new_jobs" ) + self.collector.update("Jobs", inc=0, inc_total=len(jobs)) def complete_jobs( self, load_id: str, jobs: Sequence[LoadJob], schema: Schema