From aab6baacea7376723bd8f4d8496d677b0b5bd9b2 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 27 Feb 2024 11:41:09 +0100 Subject: [PATCH 1/2] fix missing arrow compute for incrementals on arrow loads --- dlt/extract/incremental/transform.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 2fc78fe4ee..29202c31ae 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -11,6 +11,11 @@ except ModuleNotFoundError: np = None +try: + import pyarrow.compute as pc +except ModuleNotFoundError: + pc = None + from dlt.common.exceptions import MissingDependencyException from dlt.common.utils import digest128 from dlt.common.json import json From aa0db6d583314766167ff1eddb94efa34369c831 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 1 Mar 2024 20:44:42 +0100 Subject: [PATCH 2/2] fix numpy and pandas imports --- dlt/common/libs/numpy.py | 6 ++++++ dlt/common/libs/pyarrow.py | 1 + dlt/extract/extractors.py | 12 +++++++----- dlt/extract/incremental/__init__.py | 13 ++++++++----- dlt/extract/incremental/transform.py | 23 ++++++----------------- dlt/helpers/streamlit_helper.py | 8 ++++---- 6 files changed, 32 insertions(+), 31 deletions(-) create mode 100644 dlt/common/libs/numpy.py diff --git a/dlt/common/libs/numpy.py b/dlt/common/libs/numpy.py new file mode 100644 index 0000000000..ccf255c6a8 --- /dev/null +++ b/dlt/common/libs/numpy.py @@ -0,0 +1,6 @@ +from dlt.common.exceptions import MissingDependencyException + +try: + import numpy +except ModuleNotFoundError: + raise MissingDependencyException("DLT Numpy Helpers", ["numpy"]) diff --git a/dlt/common/libs/pyarrow.py b/dlt/common/libs/pyarrow.py index 31423665f7..2f556d1d00 100644 --- a/dlt/common/libs/pyarrow.py +++ b/dlt/common/libs/pyarrow.py @@ -14,6 +14,7 @@ try: import pyarrow import pyarrow.parquet + import pyarrow.compute except ModuleNotFoundError: raise MissingDependencyException( "dlt parquet Helpers", [f"{version.DLT_PKG_NAME}[parquet]"], "dlt Helpers for for parquet." diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index f6c3fde5d4..ec1bccbbfd 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -30,9 +30,9 @@ pyarrow = None try: - import pandas as pd -except ModuleNotFoundError: - pd = None + from dlt.common.libs.pandas import pandas +except MissingDependencyException: + pandas = None class Extractor: @@ -78,7 +78,9 @@ def item_format(items: TDataItems) -> Optional[TLoaderFileFormat]: """ for item in items if isinstance(items, list) else [items]: # Assume all items in list are the same type - if (pyarrow and pyarrow.is_arrow_item(item)) or (pd and isinstance(item, pd.DataFrame)): + if (pyarrow and pyarrow.is_arrow_item(item)) or ( + pandas and isinstance(item, pandas.DataFrame) + ): return "arrow" return "puae-jsonl" return None # Empty list is unknown format @@ -222,7 +224,7 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No ( # 1. Convert pandas frame(s) to arrow Table pa.Table.from_pandas(item) - if (pd and isinstance(item, pd.DataFrame)) + if (pandas and isinstance(item, pandas.DataFrame)) else item ) for item in (items if isinstance(items, list) else [items]) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 955aa12efd..7a6521e820 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -1,13 +1,11 @@ import os from typing import Generic, ClassVar, Any, Optional, Type, Dict from typing_extensions import get_origin, get_args + import inspect from functools import wraps -try: - import pandas as pd -except ModuleNotFoundError: - pd = None + import dlt from dlt.common.exceptions import MissingDependencyException @@ -50,6 +48,11 @@ except MissingDependencyException: is_arrow_item = lambda item: False +try: + from dlt.common.libs.pandas import pandas +except MissingDependencyException: + pandas = None + @configspec class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorValue]): @@ -397,7 +400,7 @@ def _get_transformer(self, items: TDataItems) -> IncrementalTransform: for item in items if isinstance(items, list) else [items]: if is_arrow_item(item): return self._transformers["arrow"] - elif pd is not None and isinstance(item, pd.DataFrame): + elif pandas is not None and isinstance(item, pandas.DataFrame): return self._transformers["arrow"] return self._transformers["json"] return self._transformers["json"] diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 29202c31ae..1356faecc7 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -1,21 +1,6 @@ from datetime import datetime, date # noqa: I251 from typing import Any, Optional, Tuple, List -try: - import pandas as pd -except ModuleNotFoundError: - pd = None - -try: - import numpy as np -except ModuleNotFoundError: - np = None - -try: - import pyarrow.compute as pc -except ModuleNotFoundError: - pc = None - from dlt.common.exceptions import MissingDependencyException from dlt.common.utils import digest128 from dlt.common.json import json @@ -33,11 +18,15 @@ try: from dlt.common.libs import pyarrow + from dlt.common.libs.pandas import pandas + from dlt.common.libs.numpy import numpy from dlt.common.libs.pyarrow import pyarrow as pa, TAnyArrowItem from dlt.common.libs.pyarrow import from_arrow_compute_output, to_arrow_compute_input except MissingDependencyException: pa = None pyarrow = None + numpy = None + pandas = None class IncrementalTransform: @@ -198,14 +187,14 @@ def _deduplicate( """Creates unique index if necessary.""" # create unique index if necessary if self._dlt_index not in tbl.schema.names: - tbl = pyarrow.append_column(tbl, self._dlt_index, pa.array(np.arange(tbl.num_rows))) + tbl = pyarrow.append_column(tbl, self._dlt_index, pa.array(numpy.arange(tbl.num_rows))) return tbl def __call__( self, tbl: "TAnyArrowItem", ) -> Tuple[TDataItem, bool, bool]: - is_pandas = pd is not None and isinstance(tbl, pd.DataFrame) + is_pandas = pandas is not None and isinstance(tbl, pandas.DataFrame) if is_pandas: tbl = pa.Table.from_pandas(tbl) diff --git a/dlt/helpers/streamlit_helper.py b/dlt/helpers/streamlit_helper.py index d3e194b18d..f6b2f3a62f 100644 --- a/dlt/helpers/streamlit_helper.py +++ b/dlt/helpers/streamlit_helper.py @@ -9,7 +9,7 @@ from dlt.common.destination.reference import WithStateSync from dlt.common.utils import flatten_list_or_items -from dlt.common.libs.pandas import pandas as pd +from dlt.common.libs.pandas import pandas from dlt.pipeline import Pipeline from dlt.pipeline.exceptions import CannotRestorePipelineException, SqlClientNotAvailable from dlt.pipeline.state_sync import load_state_from_destination @@ -102,7 +102,7 @@ def write_load_status_page(pipeline: Pipeline) -> None: """Display pipeline loading information. Will be moved to dlt package once tested""" @cache_data(ttl=600) - def _query_data(query: str, schema_name: str = None) -> pd.DataFrame: + def _query_data(query: str, schema_name: str = None) -> pandas.DataFrame: try: with pipeline.sql_client(schema_name) as client: with client.execute_query(query) as curr: @@ -111,7 +111,7 @@ def _query_data(query: str, schema_name: str = None) -> pd.DataFrame: st.error("Cannot load data - SqlClient not available") @cache_data(ttl=5) - def _query_data_live(query: str, schema_name: str = None) -> pd.DataFrame: + def _query_data_live(query: str, schema_name: str = None) -> pandas.DataFrame: try: with pipeline.sql_client(schema_name) as client: with client.execute_query(query) as curr: @@ -244,7 +244,7 @@ def write_data_explorer_page( """ @cache_data(ttl=60) - def _query_data(query: str, chunk_size: int = None) -> pd.DataFrame: + def _query_data(query: str, chunk_size: int = None) -> pandas.DataFrame: try: with pipeline.sql_client(schema_name) as client: with client.execute_query(query) as curr: