Skip to content

Commit

Permalink
arrow as data (#723)
Browse files Browse the repository at this point in the history
* moves pandas helper to libs

* wrapps arrow type instances in list

* allows to remove incremental with explicit none, uses column search for simple jsonpath

* does not check resource binding when end_value provided

* updates arrow docs

* fixes aws credentials test

* fixes wrappers type tests

* bumps version to 0.3.23
  • Loading branch information
rudolfix authored Oct 31, 2023
1 parent c5484db commit 14d28a8
Show file tree
Hide file tree
Showing 17 changed files with 187 additions and 105 deletions.
2 changes: 1 addition & 1 deletion dlt/common/jsonpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dlt.common.typing import DictStrAny

from jsonpath_ng import parse as _parse, JSONPath
from jsonpath_ng import parse as _parse, JSONPath, Fields as JSONPathFields


TJsonPath = Union[str, JSONPath] # Jsonpath compiled or str
Expand Down
7 changes: 7 additions & 0 deletions dlt/common/libs/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dlt.common.exceptions import MissingDependencyException

try:
import pandas
from pandas.io.sql import _wrap_result
except ModuleNotFoundError:
raise MissingDependencyException("DLT Pandas Helpers", ["pandas"])
2 changes: 1 addition & 1 deletion dlt/destinations/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def _get_columns(self) -> List[str]:
return [c[0] for c in self.native_cursor.description]

def df(self, chunk_size: int = None, **kwargs: Any) -> Optional[DataFrame]:
from dlt.helpers.pandas_helper import _wrap_result
from dlt.common.libs.pandas import _wrap_result

columns = self._get_columns()
if chunk_size is None:
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def transformer(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: Literal[True] = True
) -> Callable[TResourceFunParams, DltResource]: # TODO: change back to Callable[TResourceFunParams, DltResource] when mypy 1.6 is fixed
) -> Callable[TResourceFunParams, DltResource]:
...

def transformer(
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class InvalidResourceDataType(DltResourceException):
def __init__(self, resource_name: str, item: Any, _typ: Type[Any], msg: str) -> None:
self.item = item
self._typ = _typ
super().__init__(resource_name, f"Cannot create resource {resource_name} from specified data. " + msg)
super().__init__(resource_name, f"Cannot create resource {resource_name} from specified data. If you want to process just one data item, enclose it in a list. " + msg)


class InvalidResourceDataTypeAsync(InvalidResourceDataType):
Expand Down
33 changes: 18 additions & 15 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from dlt.extract.pipe import Pipe
from dlt.extract.utils import resolve_column_value
from dlt.extract.typing import SupportsPipe, TTableHintTemplate, MapItem, YieldMapItem, FilterItem, ItemTransform
from dlt.extract.incremental.transform import JsonIncremental, ArrowIncremental, IncrementalTransformer
from dlt.extract.incremental.transform import JsonIncremental, ArrowIncremental, IncrementalTransform
try:
from dlt.common.libs.pyarrow import is_arrow_item, pyarrow as pa, TAnyArrowItem
except MissingDependencyException:
Expand Down Expand Up @@ -87,9 +87,10 @@ def __init__(
end_value: Optional[TCursorValue] = None,
allow_external_schedulers: bool = False
) -> None:
# make sure that path is valid
if cursor_path:
compile_path(cursor_path)
self.cursor_path = cursor_path
if self.cursor_path:
self.cursor_path_p: JSONPath = compile_path(cursor_path)
self.last_value_func = last_value_func
self.initial_value = initial_value
"""Initial value of last_value"""
Expand All @@ -109,14 +110,14 @@ def __init__(
self.start_out_of_range: bool = False
"""Becomes true on the first item that is out of range of `start_value`. I.e. when using `max` this is a value that is lower than `start_value`"""

self._transformers: Dict[str, IncrementalTransformer] = {}
self._transformers: Dict[str, IncrementalTransform] = {}

def _make_transformers(self) -> None:
def _make_transforms(self) -> None:
types = [("arrow", ArrowIncremental), ("json", JsonIncremental)]
for dt, kls in types:
self._transformers[dt] = kls(
self.resource_name,
self.cursor_path_p,
self.cursor_path,
self.start_value,
self.end_value,
self._cached_state,
Expand Down Expand Up @@ -170,7 +171,7 @@ def merge(self, other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue
return constructor(**kwargs) # type: ignore

def on_resolved(self) -> None:
self.cursor_path_p = compile_path(self.cursor_path)
compile_path(self.cursor_path)
if self.end_value is not None and self.initial_value is None:
raise ConfigurationValueError(
"Incremental 'end_value' was specified without 'initial_value'. 'initial_value' is required when using 'end_value'."
Expand All @@ -193,7 +194,6 @@ def parse_native_representation(self, native_value: Any) -> None:
self.initial_value = native_value.initial_value
self.last_value_func = native_value.last_value_func
self.end_value = native_value.end_value
self.cursor_path_p = self.cursor_path_p
self.resource_name = self.resource_name
else: # TODO: Maybe check if callable(getattr(native_value, '__lt__', None))
# Passing bare value `incremental=44` gets parsed as initial_value
Expand All @@ -203,9 +203,6 @@ def parse_native_representation(self, native_value: Any) -> None:

def get_state(self) -> IncrementalColumnState:
"""Returns an Incremental state for a particular cursor column"""
if not self.resource_name:
raise IncrementalUnboundError(self.cursor_path)

if self.end_value is not None:
# End value uses mock state. We don't want to write it.
return {
Expand All @@ -214,6 +211,9 @@ def get_state(self) -> IncrementalColumnState:
'unique_hashes': []
}

if not self.resource_name:
raise IncrementalUnboundError(self.cursor_path)

self._cached_state = Incremental._get_state(self.resource_name, self.cursor_path)
if len(self._cached_state) == 0:
# set the default like this, setdefault evaluates the default no matter if it is needed or not. and our default is heavy
Expand All @@ -237,7 +237,7 @@ def last_value(self) -> Optional[TCursorValue]:
s = self.get_state()
return s['last_value'] # type: ignore

def _transform_item(self, transformer: IncrementalTransformer, row: TDataItem) -> Optional[TDataItem]:
def _transform_item(self, transformer: IncrementalTransform, row: TDataItem) -> Optional[TDataItem]:
row, start_out_of_range, end_out_of_range = transformer(row)
self.start_out_of_range = start_out_of_range
self.end_out_of_range = end_out_of_range
Expand Down Expand Up @@ -313,13 +313,13 @@ def bind(self, pipe: SupportsPipe) -> "Incremental[TCursorValue]":
logger.info(f"Bind incremental on {self.resource_name} with initial_value: {self.initial_value}, start_value: {self.start_value}, end_value: {self.end_value}")
# cache state
self._cached_state = self.get_state()
self._make_transformers()
self._make_transforms()
return self

def __str__(self) -> str:
return f"Incremental at {id(self)} for resource {self.resource_name} with cursor path: {self.cursor_path} initial {self.initial_value} lv_func {self.last_value_func}"

def _get_transformer(self, items: TDataItems) -> IncrementalTransformer:
def _get_transformer(self, items: TDataItems) -> IncrementalTransform:
# Assume list is all of the same type
for item in items if isinstance(items, list) else [items]:
if is_arrow_item(item):
Expand Down Expand Up @@ -397,6 +397,9 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
new_incremental = p.default.merge(explicit_value)
else:
new_incremental = explicit_value.copy()
elif explicit_value is None:
# new_incremental not set!
pass
elif isinstance(p.default, Incremental):
# Passing only initial value explicitly updates the default instance
new_incremental = p.default.copy()
Expand All @@ -408,7 +411,7 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
if is_optional_type(p.annotation):
bound_args.arguments[p.name] = None # Remove partial spec
return func(*bound_args.args, **bound_args.kwargs)
raise ValueError(f"{p.name} Incremental has no default")
raise ValueError(f"{p.name} Incremental argument has no default. Please wrap its typing in Optional[] to allow no incremental")
# pass Generic information from annotation to new_incremental
if not hasattr(new_incremental, "__orig_class__") and p.annotation and get_args(p.annotation):
new_incremental.__orig_class__ = p.annotation # type: ignore
Expand Down
49 changes: 36 additions & 13 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, date # noqa: I251
from typing import Optional, Tuple, List
from typing import Any, Optional, Tuple, List

try:
import pandas as pd
Expand All @@ -16,7 +16,7 @@
from dlt.common.json import json
from dlt.common import pendulum
from dlt.common.typing import TDataItem, TDataItems
from dlt.common.jsonpath import TJsonPath, find_values
from dlt.common.jsonpath import TJsonPath, find_values, JSONPathFields, compile_path
from dlt.extract.incremental.exceptions import IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing
from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc
from dlt.extract.utils import resolve_column_value
Expand All @@ -28,12 +28,11 @@
pa = None



class IncrementalTransformer:
class IncrementalTransform:
def __init__(
self,
resource_name: str,
cursor_path: TJsonPath,
cursor_path: str,
start_value: Optional[TCursorValue],
end_value: Optional[TCursorValue],
incremental_state: IncrementalColumnState,
Expand All @@ -48,14 +47,22 @@ def __init__(
self.last_value_func = last_value_func
self.primary_key = primary_key

# compile jsonpath
self._compiled_cursor_path = compile_path(cursor_path)
# for simple column name we'll fallback to search in dict
if isinstance(self._compiled_cursor_path, JSONPathFields) and len(self._compiled_cursor_path.fields) == 1 and self._compiled_cursor_path.fields[0] != "*":
self.cursor_path = self._compiled_cursor_path.fields[0]
self._compiled_cursor_path = None

def __call__(
self,
row: TDataItem,
) -> Tuple[bool, bool, bool]:
...


class JsonIncremental(IncrementalTransformer):
class JsonIncremental(IncrementalTransform):

def unique_value(
self,
row: TDataItem,
Expand All @@ -72,6 +79,25 @@ def unique_value(
except KeyError as k_err:
raise IncrementalPrimaryKeyMissing(resource_name, k_err.args[0], row)

def find_cursor_value(self, row: TDataItem) -> Any:
"""Finds value in row at cursor defined by self.cursor_path.
Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict
"""
row_value: Any = None
if self._compiled_cursor_path:
row_values = find_values(self._compiled_cursor_path, row)
if row_values:
row_value = row_values[0]
else:
try:
row_value = row[self.cursor_path]
except Exception:
pass
if row_value is None:
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)
return row_value

def __call__(
self,
row: TDataItem,
Expand All @@ -84,10 +110,7 @@ def __call__(
if row is None:
return row, start_out_of_range, end_out_of_range

row_values = find_values(self.cursor_path, row)
if not row_values:
raise IncrementalCursorPathMissing(self.resource_name, str(self.cursor_path), row)
row_value = row_values[0]
row_value = self.find_cursor_value(row)

# For datetime cursor, ensure the value is a timezone aware datetime.
# The object saved in state will always be a tz aware pendulum datetime so this ensures values are comparable
Expand Down Expand Up @@ -137,7 +160,7 @@ def __call__(
return row, start_out_of_range, end_out_of_range


class ArrowIncremental(IncrementalTransformer):
class ArrowIncremental(IncrementalTransform):
_dlt_index = "_dlt_index"

def unique_values(
Expand Down Expand Up @@ -229,7 +252,7 @@ def __call__(


# TODO: Json path support. For now assume the cursor_path is a column name
cursor_path = str(self.cursor_path)
cursor_path = self.cursor_path
# The new max/min value
try:
orig_row_value = compute(tbl[cursor_path])
Expand All @@ -242,7 +265,7 @@ def __call__(
except KeyError as e:
raise IncrementalCursorPathMissing(
self.resource_name, cursor_path, tbl,
f"Column name {str(cursor_path)} was not found in the arrow table. Note nested JSON paths are not supported for arrow tables and dataframes, the incremental cursor_path must be a column name."
f"Column name {cursor_path} was not found in the arrow table. Not nested JSON paths are not supported for arrow tables and dataframes, the incremental cursor_path must be a column name."
) from e

# If end_value is provided, filter to include table rows that are "less" than end_value
Expand Down
6 changes: 5 additions & 1 deletion dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
InvalidTransformerDataTypeGeneratorFunctionRequired, InvalidParentResourceDataType, InvalidParentResourceIsAFunction, InvalidResourceDataType, InvalidResourceDataTypeIsNone, InvalidTransformerGeneratorFunction,
DataItemRequiredForDynamicTableHints, InvalidResourceDataTypeAsync, InvalidResourceDataTypeBasic,
InvalidResourceDataTypeMultiplePipes, ParametrizedResourceUnbound, ResourceNameMissing, ResourceNotATransformer, ResourcesNotFoundError, DeletingResourcesNotSupported)
from dlt.extract.wrappers import wrap_additional_type


def with_table_name(item: TDataItems, table_name: str) -> DataItemWithMeta:
Expand Down Expand Up @@ -91,6 +92,9 @@ def from_data(
if not name:
raise ResourceNameMissing()

# wrap additional types
data = wrap_additional_type(data)

# several iterable types are not allowed and must be excluded right away
if isinstance(data, (AsyncIterator, AsyncIterable)):
raise InvalidResourceDataTypeAsync(name, data, type(data))
Expand All @@ -109,7 +113,7 @@ def from_data(
return cls(pipe, table_schema_template, selected, incremental=incremental, section=section, args_bound=not callable(data))
else:
# some other data type that is not supported
raise InvalidResourceDataType(name, data, type(data), f"The data type is {type(data).__name__}")
raise InvalidResourceDataType(name, data, type(data), f"The data type of supplied type is {type(data).__name__}")

@property
def name(self) -> str:
Expand Down
6 changes: 1 addition & 5 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,10 @@ def _tx_partial(item: TDataItems, meta: Any = None) -> Any:
def wrap_resource_gen(name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any) -> AnyFun:
"""Wraps a generator or generator function so it is evaluated on extraction"""
if inspect.isgeneratorfunction(inspect.unwrap(f)) or inspect.isgenerator(f):
# if no arguments then no wrap
# if len(sig.parameters) == 0:
# return f

# always wrap generators and generator functions. evaluate only at runtime!

def _partial() -> Any:
# print(f"_PARTIAL: {args} {kwargs} vs {args_}{kwargs_}")
# print(f"_PARTIAL: {args} {kwargs}")
return f(*args, **kwargs)

# this partial preserves the original signature and just defers the call to pipe
Expand Down
25 changes: 25 additions & 0 deletions dlt/extract/wrappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Any

from dlt.common.typing import NoneType
from dlt.common.exceptions import MissingDependencyException


try:
from dlt.common.libs.pandas import pandas
from dlt.common.libs.pyarrow import pyarrow

PandaFrame, ArrowTable, ArrowRecords = pandas.DataFrame, pyarrow.Table, pyarrow.RecordBatch
except MissingDependencyException:
PandaFrame, ArrowTable, ArrowRecords = NoneType, NoneType, NoneType


def wrap_additional_type(data: Any) -> Any:
"""Wraps any known additional type so it is accepted by DltResource"""
# pass through None: if optional deps are not defined, they fallback to None type
if data is None:
return data

if isinstance(data, (PandaFrame, ArrowTable, ArrowRecords)):
return [data]

return data
Loading

0 comments on commit 14d28a8

Please sign in to comment.