From 7bf60933e69ef742aca5d561ef78414ec55dd7b8 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jul 2024 13:17:17 +0400 Subject: [PATCH 1/8] WIP: implement normalizer progress tracking for JSON normalizer --- dlt/common/pipeline.py | 16 ++++++++++++++++ dlt/normalize/items_normalizers.py | 18 +++++++++++++++++- dlt/normalize/normalize.py | 21 +++++++++++++-------- dlt/normalize/worker.py | 19 ++++++++++++------- dlt/pipeline/pipeline.py | 13 ++++++++----- 5 files changed, 66 insertions(+), 21 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 1e1416eb53..76eef5da3a 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -212,6 +212,22 @@ class _ExtractInfo(NamedTuple): class ExtractInfo(StepInfo[ExtractMetrics], _ExtractInfo): # type: ignore[misc] """A tuple holding information on extracted data items. Returned by pipeline `extract` method.""" + @property + def total_rows_count(self): + """Return the total extracted rows count from all the jobs. + + Returns: + int: Total extracted rows count. + """ + count = 0 + + for _, metrics_list in self.metrics.items(): + for metrics in metrics_list: + for _, value in metrics["job_metrics"].items(): + count += value.items_count + + return count + def asdict(self) -> DictStrAny: """A dictionary representation of ExtractInfo that can be loaded with `dlt`""" d = super().asdict() diff --git a/dlt/normalize/items_normalizers.py b/dlt/normalize/items_normalizers.py index 5f84d57d7a..786947333d 100644 --- a/dlt/normalize/items_normalizers.py +++ b/dlt/normalize/items_normalizers.py @@ -7,7 +7,9 @@ from dlt.common.data_writers.writers import ArrowToObjectAdapter from dlt.common.json import custom_pua_decode, may_have_pua from dlt.common.normalizers.json.relational import DataItemNormalizer as RelationalNormalizer +from dlt.common.pipeline import ExtractInfo from dlt.common.runtime import signals +from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema.typing import TSchemaEvolutionMode, TTableSchemaColumns, TSchemaContractDict from dlt.common.schema.utils import has_table_seen_data from dlt.common.storages import NormalizeStorage @@ -36,12 +38,16 @@ def __init__( schema: Schema, load_id: str, config: NormalizeConfiguration, + extract_info: ExtractInfo, + collector: Collector = NULL_COLLECTOR, ) -> None: self.item_storage = item_storage self.normalize_storage = normalize_storage self.schema = schema self.load_id = load_id self.config = config + self.collector = collector + self.extract_info = extract_info @abstractmethod def __call__(self, extracted_items_file: str, root_table_name: str) -> List[TSchemaUpdate]: ... @@ -55,8 +61,12 @@ def __init__( schema: Schema, load_id: str, config: NormalizeConfiguration, + extract_info: ExtractInfo, + collector: Collector = NULL_COLLECTOR, ) -> None: - super().__init__(item_storage, normalize_storage, schema, load_id, config) + super().__init__( + item_storage, normalize_storage, schema, load_id, config, extract_info, collector + ) self._table_contracts: Dict[str, TSchemaContractDict] = {} self._filtered_tables: Set[str] = set() self._filtered_tables_columns: Dict[str, Dict[str, TSchemaEvolutionMode]] = {} @@ -83,9 +93,15 @@ def _normalize_chunk( schema_name = schema.name normalize_data_fun = self.schema.normalize_data_item + total = self.extract_info.total_rows_count if self.extract_info else None + + import time + for item in items: + time.sleep(0.7) items_gen = normalize_data_fun(item, self.load_id, root_table_name) try: + self.collector.update("Items", total=total) should_descend: bool = None # use send to prevent descending into child rows when row was discarded while row_info := items_gen.send(should_descend): diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index 98154cd5cf..2259f902d3 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -23,6 +23,7 @@ from dlt.common.schema import TSchemaUpdate, Schema from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.pipeline import ( + ExtractInfo, NormalizeInfo, NormalizeMetrics, SupportsPipeline, @@ -48,6 +49,7 @@ class Normalize(Runnable[Executor], WithStepInfo[NormalizeMetrics, NormalizeInfo @with_config(spec=NormalizeConfiguration, sections=(known_sections.NORMALIZE,)) def __init__( self, + extract_info: ExtractInfo, collector: Collector = NULL_COLLECTOR, schema_storage: SchemaStorage = None, config: NormalizeConfiguration = config.value, @@ -58,6 +60,7 @@ def __init__( self.pool = NullExecutor() self.load_storage: LoadStorage = None self.schema_storage: SchemaStorage = None + self.extract_info = extract_info # setup storages self.create_storages() @@ -101,6 +104,8 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW schema_dict, load_id, files, + self.extract_info, + self.collector, ) for files in chunk_files ] @@ -127,9 +132,6 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW summary.file_metrics.extend(result.file_metrics) # update metrics self.collector.update("Files", len(result.file_metrics)) - self.collector.update( - "Items", sum(result.file_metrics, EMPTY_DATA_WRITER_METRICS).items_count - ) except CannotCoerceColumnException as exc: # schema conflicts resulting from parallel executing logger.warning( @@ -141,7 +143,12 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW # schedule the task again schema_dict = schema.to_dict() # TODO: it's time for a named tuple - params = params[:3] + (schema_dict,) + params[4:] + params = ( + params[:3] + + (schema_dict,) + + params[4:] + + (self.extract_info, self.collector) + ) retry_pending: Future[TWorkerRV] = self.pool.submit( w_normalize_files, *params ) @@ -160,12 +167,11 @@ def map_single(self, schema: Schema, load_id: str, files: Sequence[str]) -> TWor schema.to_dict(), load_id, files, + self.extract_info, + self.collector, ) self.update_schema(schema, result.schema_updates) self.collector.update("Files", len(result.file_metrics)) - self.collector.update( - "Items", sum(result.file_metrics, EMPTY_DATA_WRITER_METRICS).items_count - ) return result def spool_files( @@ -294,7 +300,6 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: continue with self.collector(f"Normalize {schema.name} in {load_id}"): self.collector.update("Files", 0, len(schema_files)) - self.collector.update("Items", 0) self._step_info_start_load_id(load_id) self.spool_schema_files(load_id, schema, schema_files) diff --git a/dlt/normalize/worker.py b/dlt/normalize/worker.py index 10d0a00eb1..a027e73e2a 100644 --- a/dlt/normalize/worker.py +++ b/dlt/normalize/worker.py @@ -10,7 +10,8 @@ get_best_writer_spec, is_native_writer, ) -from dlt.common.utils import chunks +from dlt.common.pipeline import ExtractInfo +from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema.typing import TStoredSchema, TTableSchema from dlt.common.storages import ( NormalizeStorage, @@ -20,6 +21,7 @@ ParsedLoadJobFileName, ) from dlt.common.schema import TSchemaUpdate, Schema +from dlt.common.utils import chunks from dlt.normalize.configuration import NormalizeConfiguration from dlt.normalize.exceptions import NormalizeJobFailed @@ -61,6 +63,8 @@ def w_normalize_files( stored_schema: TStoredSchema, load_id: str, extracted_items_files: Sequence[str], + extract_info: ExtractInfo, + collector: Collector = NULL_COLLECTOR, ) -> TWorkerRV: destination_caps = config.destination_capabilities schema_updates: List[TSchemaUpdate] = [] @@ -82,7 +86,10 @@ def w_normalize_files( load_storage = LoadStorage(False, supported_file_formats, loader_storage_config) def _get_items_normalizer( - parsed_file_name: ParsedLoadJobFileName, table_schema: TTableSchema + parsed_file_name: ParsedLoadJobFileName, + table_schema: TTableSchema, + extract_info: ExtractInfo, + collector: Collector = NULL_COLLECTOR, ) -> ItemsNormalizer: item_format = DataWriter.item_format_from_file_extension(parsed_file_name.file_format) @@ -183,11 +190,7 @@ def _get_items_normalizer( f" format {item_storage.writer_spec.file_format}" ) norm = item_normalizers[table_name] = cls( - item_storage, - normalize_storage, - schema, - load_id, - config, + item_storage, normalize_storage, schema, load_id, config, extract_info, collector ) return norm @@ -236,6 +239,8 @@ def _gather_metrics_and_close( normalizer = _get_items_normalizer( parsed_file_name, stored_schema["tables"].get(root_table_name, {"name": root_table_name}), + extract_info, + collector, ) logger.debug( f"Processing extracted items in {extracted_items_file} in load_id" diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 4f29ca4c87..9d242bdb23 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -446,6 +446,7 @@ def extract( # commit load packages with state extract_step.commit_packages() return self._get_step_info(extract_step) + except Exception as exc: # emit step info step_info = self._get_step_info(extract_step) @@ -493,7 +494,10 @@ def _verify_destination_capabilities( @with_schemas_sync @with_config_section((known_sections.NORMALIZE,)) def normalize( - self, workers: int = 1, loader_file_format: TLoaderFileFormat = None + self, + workers: int = 1, + loader_file_format: TLoaderFileFormat = None, + extract_info: ExtractInfo = None, ) -> NormalizeInfo: """Normalizes the data prepared with `extract` method, infers the schema and creates load packages for the `load` method. Requires `destination` to be known.""" if is_interactive(): @@ -525,6 +529,7 @@ def normalize( collector=self.collector, config=normalize_config, schema_storage=self._schema_storage, + extract_info=extract_info, ) try: with signals.delayed_signals(): @@ -707,7 +712,7 @@ def run( # extract from the source if data is not None: - self.extract( + extract_info = self.extract( data, table_name=table_name, write_disposition=write_disposition, @@ -717,10 +722,8 @@ def run( schema_contract=schema_contract, refresh=refresh or self.refresh, ) - self.normalize(loader_file_format=loader_file_format) + self.normalize(loader_file_format=loader_file_format, extract_info=extract_info) return self.load(destination, dataset_name, credentials=credentials) - else: - return None @with_schemas_sync def sync_destination( From 164d65e54cab7abd551fe9e4adf89672e7cba05b Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jul 2024 13:48:38 +0400 Subject: [PATCH 2/8] refactor --- dlt/normalize/items_normalizers.py | 11 ++--------- dlt/normalize/normalize.py | 15 +++++---------- dlt/normalize/worker.py | 5 +---- dlt/pipeline/pipeline.py | 6 +++--- 4 files changed, 11 insertions(+), 26 deletions(-) diff --git a/dlt/normalize/items_normalizers.py b/dlt/normalize/items_normalizers.py index 786947333d..3f15e38548 100644 --- a/dlt/normalize/items_normalizers.py +++ b/dlt/normalize/items_normalizers.py @@ -38,7 +38,6 @@ def __init__( schema: Schema, load_id: str, config: NormalizeConfiguration, - extract_info: ExtractInfo, collector: Collector = NULL_COLLECTOR, ) -> None: self.item_storage = item_storage @@ -47,7 +46,6 @@ def __init__( self.load_id = load_id self.config = config self.collector = collector - self.extract_info = extract_info @abstractmethod def __call__(self, extracted_items_file: str, root_table_name: str) -> List[TSchemaUpdate]: ... @@ -61,12 +59,9 @@ def __init__( schema: Schema, load_id: str, config: NormalizeConfiguration, - extract_info: ExtractInfo, collector: Collector = NULL_COLLECTOR, ) -> None: - super().__init__( - item_storage, normalize_storage, schema, load_id, config, extract_info, collector - ) + super().__init__(item_storage, normalize_storage, schema, load_id, config, collector) self._table_contracts: Dict[str, TSchemaContractDict] = {} self._filtered_tables: Set[str] = set() self._filtered_tables_columns: Dict[str, Dict[str, TSchemaEvolutionMode]] = {} @@ -93,15 +88,13 @@ def _normalize_chunk( schema_name = schema.name normalize_data_fun = self.schema.normalize_data_item - total = self.extract_info.total_rows_count if self.extract_info else None - import time for item in items: time.sleep(0.7) items_gen = normalize_data_fun(item, self.load_id, root_table_name) try: - self.collector.update("Items", total=total) + self.collector.update("Items") should_descend: bool = None # use send to prevent descending into child rows when row was discarded while row_info := items_gen.send(should_descend): diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index 2259f902d3..f4242aae23 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -49,7 +49,7 @@ class Normalize(Runnable[Executor], WithStepInfo[NormalizeMetrics, NormalizeInfo @with_config(spec=NormalizeConfiguration, sections=(known_sections.NORMALIZE,)) def __init__( self, - extract_info: ExtractInfo, + extracted_count: int, collector: Collector = NULL_COLLECTOR, schema_storage: SchemaStorage = None, config: NormalizeConfiguration = config.value, @@ -60,7 +60,7 @@ def __init__( self.pool = NullExecutor() self.load_storage: LoadStorage = None self.schema_storage: SchemaStorage = None - self.extract_info = extract_info + self.extracted_count = extracted_count # setup storages self.create_storages() @@ -104,7 +104,6 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW schema_dict, load_id, files, - self.extract_info, self.collector, ) for files in chunk_files @@ -143,12 +142,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW # schedule the task again schema_dict = schema.to_dict() # TODO: it's time for a named tuple - params = ( - params[:3] - + (schema_dict,) - + params[4:] - + (self.extract_info, self.collector) - ) + params = params[:3] + (schema_dict,) + params[4:] + (self.collector,) retry_pending: Future[TWorkerRV] = self.pool.submit( w_normalize_files, *params ) @@ -167,7 +161,6 @@ def map_single(self, schema: Schema, load_id: str, files: Sequence[str]) -> TWor schema.to_dict(), load_id, files, - self.extract_info, self.collector, ) self.update_schema(schema, result.schema_updates) @@ -300,6 +293,8 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: continue with self.collector(f"Normalize {schema.name} in {load_id}"): self.collector.update("Files", 0, len(schema_files)) + self.collector.update("Items", 0, total=self.extracted_count) + self._step_info_start_load_id(load_id) self.spool_schema_files(load_id, schema, schema_files) diff --git a/dlt/normalize/worker.py b/dlt/normalize/worker.py index a027e73e2a..d42d8d2ff1 100644 --- a/dlt/normalize/worker.py +++ b/dlt/normalize/worker.py @@ -63,7 +63,6 @@ def w_normalize_files( stored_schema: TStoredSchema, load_id: str, extracted_items_files: Sequence[str], - extract_info: ExtractInfo, collector: Collector = NULL_COLLECTOR, ) -> TWorkerRV: destination_caps = config.destination_capabilities @@ -88,7 +87,6 @@ def w_normalize_files( def _get_items_normalizer( parsed_file_name: ParsedLoadJobFileName, table_schema: TTableSchema, - extract_info: ExtractInfo, collector: Collector = NULL_COLLECTOR, ) -> ItemsNormalizer: item_format = DataWriter.item_format_from_file_extension(parsed_file_name.file_format) @@ -190,7 +188,7 @@ def _get_items_normalizer( f" format {item_storage.writer_spec.file_format}" ) norm = item_normalizers[table_name] = cls( - item_storage, normalize_storage, schema, load_id, config, extract_info, collector + item_storage, normalize_storage, schema, load_id, config, collector ) return norm @@ -239,7 +237,6 @@ def _gather_metrics_and_close( normalizer = _get_items_normalizer( parsed_file_name, stored_schema["tables"].get(root_table_name, {"name": root_table_name}), - extract_info, collector, ) logger.debug( diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 9d242bdb23..6c4cb7cf9d 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -497,7 +497,7 @@ def normalize( self, workers: int = 1, loader_file_format: TLoaderFileFormat = None, - extract_info: ExtractInfo = None, + extracted_count: int = None, ) -> NormalizeInfo: """Normalizes the data prepared with `extract` method, infers the schema and creates load packages for the `load` method. Requires `destination` to be known.""" if is_interactive(): @@ -529,7 +529,7 @@ def normalize( collector=self.collector, config=normalize_config, schema_storage=self._schema_storage, - extract_info=extract_info, + extracted_count=extracted_count, ) try: with signals.delayed_signals(): @@ -722,7 +722,7 @@ def run( schema_contract=schema_contract, refresh=refresh or self.refresh, ) - self.normalize(loader_file_format=loader_file_format, extract_info=extract_info) + self.normalize(loader_file_format=loader_file_format, extracted_count=extract_info.total_rows_count if extract_info else None) return self.load(destination, dataset_name, credentials=credentials) @with_schemas_sync From 057183f9334a67066479acc45a354bb4057fe705 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jul 2024 13:52:53 +0400 Subject: [PATCH 3/8] minor fixes --- dlt/common/pipeline.py | 2 +- dlt/normalize/items_normalizers.py | 5 ++--- dlt/normalize/normalize.py | 1 - dlt/normalize/worker.py | 1 - 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 76eef5da3a..41e534b6b4 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -213,7 +213,7 @@ class ExtractInfo(StepInfo[ExtractMetrics], _ExtractInfo): # type: ignore[misc] """A tuple holding information on extracted data items. Returned by pipeline `extract` method.""" @property - def total_rows_count(self): + def total_rows_count(self) -> int: """Return the total extracted rows count from all the jobs. Returns: diff --git a/dlt/normalize/items_normalizers.py b/dlt/normalize/items_normalizers.py index 3f15e38548..8ec1b2de40 100644 --- a/dlt/normalize/items_normalizers.py +++ b/dlt/normalize/items_normalizers.py @@ -7,7 +7,6 @@ from dlt.common.data_writers.writers import ArrowToObjectAdapter from dlt.common.json import custom_pua_decode, may_have_pua from dlt.common.normalizers.json.relational import DataItemNormalizer as RelationalNormalizer -from dlt.common.pipeline import ExtractInfo from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema.typing import TSchemaEvolutionMode, TTableSchemaColumns, TSchemaContractDict @@ -88,10 +87,10 @@ def _normalize_chunk( schema_name = schema.name normalize_data_fun = self.schema.normalize_data_item - import time + # import time for item in items: - time.sleep(0.7) + # time.sleep(0.7) items_gen = normalize_data_fun(item, self.load_id, root_table_name) try: self.collector.update("Items") diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index f4242aae23..6242c963f1 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -23,7 +23,6 @@ from dlt.common.schema import TSchemaUpdate, Schema from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.pipeline import ( - ExtractInfo, NormalizeInfo, NormalizeMetrics, SupportsPipeline, diff --git a/dlt/normalize/worker.py b/dlt/normalize/worker.py index d42d8d2ff1..82a8cf88df 100644 --- a/dlt/normalize/worker.py +++ b/dlt/normalize/worker.py @@ -10,7 +10,6 @@ get_best_writer_spec, is_native_writer, ) -from dlt.common.pipeline import ExtractInfo from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema.typing import TStoredSchema, TTableSchema from dlt.common.storages import ( From 37647ef6d097e9cf6576e8434e98e9b752937b66 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 30 Jul 2024 11:13:33 +0400 Subject: [PATCH 4/8] minor fixes --- dlt/normalize/normalize.py | 4 ++-- dlt/pipeline/pipeline.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index 6242c963f1..b7d4c6916a 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -48,7 +48,7 @@ class Normalize(Runnable[Executor], WithStepInfo[NormalizeMetrics, NormalizeInfo @with_config(spec=NormalizeConfiguration, sections=(known_sections.NORMALIZE,)) def __init__( self, - extracted_count: int, + extracted_count: Optional[int] = None, collector: Collector = NULL_COLLECTOR, schema_storage: SchemaStorage = None, config: NormalizeConfiguration = config.value, @@ -141,7 +141,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW # schedule the task again schema_dict = schema.to_dict() # TODO: it's time for a named tuple - params = params[:3] + (schema_dict,) + params[4:] + (self.collector,) + params = params[:3] + (schema_dict,) + params[4:] retry_pending: Future[TWorkerRV] = self.pool.submit( w_normalize_files, *params ) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 6c4cb7cf9d..862a0a6f5f 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -722,8 +722,12 @@ def run( schema_contract=schema_contract, refresh=refresh or self.refresh, ) - self.normalize(loader_file_format=loader_file_format, extracted_count=extract_info.total_rows_count if extract_info else None) + self.normalize( + loader_file_format=loader_file_format, + extracted_count=extract_info.total_rows_count if extract_info else None, + ) return self.load(destination, dataset_name, credentials=credentials) + return None @with_schemas_sync def sync_destination( From cdd7a9d82c98a009b569a9427272d4b62b3529a3 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 31 Jul 2024 12:31:07 +0400 Subject: [PATCH 5/8] add test --- tests/pipeline/test_pipeline_extra.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index 308cdcd91d..50ef456bfc 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -2,6 +2,7 @@ import importlib.util from typing import Any, ClassVar, Dict, Iterator, List, Optional import pytest +from unittest import mock try: from pydantic import BaseModel @@ -99,6 +100,20 @@ def test_pipeline_progress(progress: TCollectorArg) -> None: assert isinstance(collector, LogCollector) +@pytest.mark.parametrize("progress", ["tqdm", "enlighten", "log", "alive_progress"]) +def test_pipeline_normalize_progress(progress: TCollectorArg) -> None: + os.environ["TIMEOUT"] = "3.0" + + p = dlt.pipeline(destination="dummy", progress=progress) + p.extract(many_delayed(5, 10)) + + with mock.patch.object(p.collector, "update") as col_mock: + p.normalize(extracted_count=10) + assert col_mock.call_count == 54 + + p.run(dataset_name="dummy") + + @pytest.mark.parametrize("method", ("extract", "run")) def test_column_argument_pydantic(method: str) -> None: """Test columns schema is created from pydantic model""" From 23e4b1411f1e61a5e45113c40244cbdc27ea02f1 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 31 Jul 2024 12:32:16 +0400 Subject: [PATCH 6/8] del test lines --- dlt/normalize/items_normalizers.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dlt/normalize/items_normalizers.py b/dlt/normalize/items_normalizers.py index 8ec1b2de40..1be2f28829 100644 --- a/dlt/normalize/items_normalizers.py +++ b/dlt/normalize/items_normalizers.py @@ -87,10 +87,7 @@ def _normalize_chunk( schema_name = schema.name normalize_data_fun = self.schema.normalize_data_item - # import time - for item in items: - # time.sleep(0.7) items_gen = normalize_data_fun(item, self.load_id, root_table_name) try: self.collector.update("Items") From 35041321cb3b2eef40e53fbfc45c08ee262d5f5b Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 1 Aug 2024 14:53:34 +0400 Subject: [PATCH 7/8] use pipeline state for extract counting --- dlt/pipeline/pipeline.py | 16 ++++++++-------- tests/pipeline/test_pipeline_extra.py | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 862a0a6f5f..897e4aa506 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -445,7 +445,11 @@ def extract( ) # commit load packages with state extract_step.commit_packages() - return self._get_step_info(extract_step) + + info = self._get_step_info(extract_step) + state = self._container[StateInjectableContext].state + state["_local"]["_last_extracted_count"] = info.total_rows_count + return info except Exception as exc: # emit step info @@ -497,7 +501,6 @@ def normalize( self, workers: int = 1, loader_file_format: TLoaderFileFormat = None, - extracted_count: int = None, ) -> NormalizeInfo: """Normalizes the data prepared with `extract` method, infers the schema and creates load packages for the `load` method. Requires `destination` to be known.""" if is_interactive(): @@ -529,7 +532,7 @@ def normalize( collector=self.collector, config=normalize_config, schema_storage=self._schema_storage, - extracted_count=extracted_count, + extracted_count=self._get_state()["_local"].get("_last_extracted_count"), ) try: with signals.delayed_signals(): @@ -712,7 +715,7 @@ def run( # extract from the source if data is not None: - extract_info = self.extract( + self.extract( data, table_name=table_name, write_disposition=write_disposition, @@ -722,10 +725,7 @@ def run( schema_contract=schema_contract, refresh=refresh or self.refresh, ) - self.normalize( - loader_file_format=loader_file_format, - extracted_count=extract_info.total_rows_count if extract_info else None, - ) + self.normalize(loader_file_format=loader_file_format) return self.load(destination, dataset_name, credentials=credentials) return None diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index 50ef456bfc..2411a68827 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -108,7 +108,7 @@ def test_pipeline_normalize_progress(progress: TCollectorArg) -> None: p.extract(many_delayed(5, 10)) with mock.patch.object(p.collector, "update") as col_mock: - p.normalize(extracted_count=10) + p.normalize() assert col_mock.call_count == 54 p.run(dataset_name="dummy") From 491efaf9defad551adc1dca0668b1179f3aff23a Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 1 Aug 2024 14:57:12 +0400 Subject: [PATCH 8/8] lint fix --- dlt/common/pipeline.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 41e534b6b4..a7fc77109a 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -475,6 +475,8 @@ class TPipelineLocalState(TypedDict, total=False): """Timestamp indicating when the state was synced with the destination.""" _last_extracted_hash: str """Hash of state that was recently synced with destination""" + _last_extracted_count: int + """Number of extracted rows in the last run""" class TPipelineState(TVersionedState, total=False):