Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import missing pyarrow compute for transforms on arrowitems #1010

Merged
merged 2 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dlt/common/libs/numpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dlt.common.exceptions import MissingDependencyException

try:
import numpy
except ModuleNotFoundError:
raise MissingDependencyException("DLT Numpy Helpers", ["numpy"])
1 change: 1 addition & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
try:
import pyarrow
import pyarrow.parquet
import pyarrow.compute
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt parquet Helpers", [f"{version.DLT_PKG_NAME}[parquet]"], "dlt Helpers for for parquet."
Expand Down
12 changes: 7 additions & 5 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
pyarrow = None

try:
import pandas as pd
except ModuleNotFoundError:
pd = None
from dlt.common.libs.pandas import pandas
except MissingDependencyException:
pandas = None


class Extractor:
Expand Down Expand Up @@ -78,7 +78,9 @@ def item_format(items: TDataItems) -> Optional[TLoaderFileFormat]:
"""
for item in items if isinstance(items, list) else [items]:
# Assume all items in list are the same type
if (pyarrow and pyarrow.is_arrow_item(item)) or (pd and isinstance(item, pd.DataFrame)):
if (pyarrow and pyarrow.is_arrow_item(item)) or (
pandas and isinstance(item, pandas.DataFrame)
):
return "arrow"
return "puae-jsonl"
return None # Empty list is unknown format
Expand Down Expand Up @@ -222,7 +224,7 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No
(
# 1. Convert pandas frame(s) to arrow Table
pa.Table.from_pandas(item)
if (pd and isinstance(item, pd.DataFrame))
if (pandas and isinstance(item, pandas.DataFrame))
else item
)
for item in (items if isinstance(items, list) else [items])
Expand Down
13 changes: 8 additions & 5 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import os
from typing import Generic, ClassVar, Any, Optional, Type, Dict
from typing_extensions import get_origin, get_args

import inspect
from functools import wraps

try:
import pandas as pd
except ModuleNotFoundError:
pd = None


import dlt
from dlt.common.exceptions import MissingDependencyException
Expand Down Expand Up @@ -50,6 +48,11 @@
except MissingDependencyException:
is_arrow_item = lambda item: False

try:
from dlt.common.libs.pandas import pandas
except MissingDependencyException:
pandas = None


@configspec
class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorValue]):
Expand Down Expand Up @@ -397,7 +400,7 @@ def _get_transformer(self, items: TDataItems) -> IncrementalTransform:
for item in items if isinstance(items, list) else [items]:
if is_arrow_item(item):
return self._transformers["arrow"]
elif pd is not None and isinstance(item, pd.DataFrame):
elif pandas is not None and isinstance(item, pandas.DataFrame):
return self._transformers["arrow"]
return self._transformers["json"]
return self._transformers["json"]
Expand Down
18 changes: 6 additions & 12 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
from datetime import datetime, date # noqa: I251
from typing import Any, Optional, Tuple, List

try:
import pandas as pd
except ModuleNotFoundError:
pd = None

try:
import numpy as np
except ModuleNotFoundError:
np = None

from dlt.common.exceptions import MissingDependencyException
from dlt.common.utils import digest128
from dlt.common.json import json
Expand All @@ -28,11 +18,15 @@

try:
from dlt.common.libs import pyarrow
from dlt.common.libs.pandas import pandas
from dlt.common.libs.numpy import numpy
from dlt.common.libs.pyarrow import pyarrow as pa, TAnyArrowItem
from dlt.common.libs.pyarrow import from_arrow_compute_output, to_arrow_compute_input
except MissingDependencyException:
pa = None
pyarrow = None
numpy = None
pandas = None


class IncrementalTransform:
Expand Down Expand Up @@ -193,14 +187,14 @@ def _deduplicate(
"""Creates unique index if necessary."""
# create unique index if necessary
if self._dlt_index not in tbl.schema.names:
tbl = pyarrow.append_column(tbl, self._dlt_index, pa.array(np.arange(tbl.num_rows)))
tbl = pyarrow.append_column(tbl, self._dlt_index, pa.array(numpy.arange(tbl.num_rows)))
return tbl

def __call__(
self,
tbl: "TAnyArrowItem",
) -> Tuple[TDataItem, bool, bool]:
is_pandas = pd is not None and isinstance(tbl, pd.DataFrame)
is_pandas = pandas is not None and isinstance(tbl, pandas.DataFrame)
if is_pandas:
tbl = pa.Table.from_pandas(tbl)

Expand Down
8 changes: 4 additions & 4 deletions dlt/helpers/streamlit_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.common.destination.reference import WithStateSync
from dlt.common.utils import flatten_list_or_items

from dlt.common.libs.pandas import pandas as pd
from dlt.common.libs.pandas import pandas
from dlt.pipeline import Pipeline
from dlt.pipeline.exceptions import CannotRestorePipelineException, SqlClientNotAvailable
from dlt.pipeline.state_sync import load_state_from_destination
Expand Down Expand Up @@ -102,7 +102,7 @@ def write_load_status_page(pipeline: Pipeline) -> None:
"""Display pipeline loading information. Will be moved to dlt package once tested"""

@cache_data(ttl=600)
def _query_data(query: str, schema_name: str = None) -> pd.DataFrame:
def _query_data(query: str, schema_name: str = None) -> pandas.DataFrame:
try:
with pipeline.sql_client(schema_name) as client:
with client.execute_query(query) as curr:
Expand All @@ -111,7 +111,7 @@ def _query_data(query: str, schema_name: str = None) -> pd.DataFrame:
st.error("Cannot load data - SqlClient not available")

@cache_data(ttl=5)
def _query_data_live(query: str, schema_name: str = None) -> pd.DataFrame:
def _query_data_live(query: str, schema_name: str = None) -> pandas.DataFrame:
try:
with pipeline.sql_client(schema_name) as client:
with client.execute_query(query) as curr:
Expand Down Expand Up @@ -244,7 +244,7 @@ def write_data_explorer_page(
"""

@cache_data(ttl=60)
def _query_data(query: str, chunk_size: int = None) -> pd.DataFrame:
def _query_data(query: str, chunk_size: int = None) -> pandas.DataFrame:
try:
with pipeline.sql_client(schema_name) as client:
with client.execute_query(query) as curr:
Expand Down
Loading