Skip to content

Commit

Permalink
Mockup incremental implementation for arrow tables
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Oct 10, 2023
1 parent 7d0f648 commit 1845630
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 84 deletions.
108 changes: 24 additions & 84 deletions dlt/extract/incremental.py → dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,17 @@
from dlt.common.data_types.type_helpers import coerce_from_date_types, coerce_value, py_type_to_sc_type

from dlt.extract.exceptions import IncrementalUnboundError, PipeException
from dlt.extract.incremental.exceptions import IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing
from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc
from dlt.extract.pipe import Pipe
from dlt.extract.utils import resolve_column_value
from dlt.extract.typing import FilterItem, SupportsPipe, TTableHintTemplate
from dlt.extract.typing import SupportsPipe, TTableHintTemplate, MapItem, YieldMapItem, FilterItem
from dlt.extract.incremental.transform import get_transformer


TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]


class IncrementalColumnState(TypedDict):
initial_value: Optional[Any]
last_value: Optional[Any]
unique_hashes: List[str]


class IncrementalCursorPathMissing(PipeException):
def __init__(self, pipe_name: str, json_path: str, item: TDataItem) -> None:
self.json_path = json_path
self.item = item
msg = f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database."
super().__init__(pipe_name, msg)


class IncrementalPrimaryKeyMissing(PipeException):
def __init__(self, pipe_name: str, primary_key_column: str, item: TDataItem) -> None:
self.primary_key_column = primary_key_column
self.item = item
msg = f"Primary key column {primary_key_column} was not found in extracted data item. All data items must contain this column. Use the same names of fields as in your JSON document."
super().__init__(pipe_name, msg)


@configspec
class Incremental(FilterItem, BaseConfiguration, Generic[TCursorValue]):
class Incremental(YieldMapItem, BaseConfiguration, Generic[TCursorValue]):
"""Adds incremental extraction for a resource by storing a cursor value in persistent state.
The cursor could for example be a timestamp for when the record was created and you can use this to load only
Expand Down Expand Up @@ -244,62 +222,20 @@ def unique_value(self, row: TDataItem) -> str:
except KeyError as k_err:
raise IncrementalPrimaryKeyMissing(self.resource_name, k_err.args[0], row)

def transform(self, row: TDataItem) -> bool:
def transform(self, row: TDataItem) -> TDataItem:
if row is None:
return True

row_values = find_values(self.cursor_path_p, row)
if not row_values:
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)
row_value = row_values[0]

# For datetime cursor, ensure the value is a timezone aware datetime.
# The object saved in state will always be a tz aware pendulum datetime so this ensures values are comparable
if isinstance(row_value, datetime):
row_value = pendulum.instance(row_value)

incremental_state = self._cached_state
last_value = incremental_state['last_value']
last_value_func = self.last_value_func

# Check whether end_value has been reached
# Filter end value ranges exclusively, so in case of "max" function we remove values >= end_value
if self.end_value is not None and (
last_value_func((row_value, self.end_value)) != self.end_value or last_value_func((row_value, )) == self.end_value
):
self.end_out_of_range = True
return False

check_values = (row_value,) + ((last_value, ) if last_value is not None else ())
new_value = last_value_func(check_values)
if last_value == new_value:
processed_row_value = last_value_func((row_value, ))
# we store row id for all records with the current "last_value" in state and use it to deduplicate
if processed_row_value == last_value:
unique_value = self.unique_value(row)
# if unique value exists then use it to deduplicate
if unique_value:
if unique_value in incremental_state['unique_hashes']:
return False
# add new hash only if the record row id is same as current last value
incremental_state['unique_hashes'].append(unique_value)
return True
# skip the record that is not a last_value or new_value: that record was already processed
check_values = (row_value,) + ((self.start_value,) if self.start_value is not None else ())
new_value = last_value_func(check_values)
# Include rows == start_value but exclude "lower"
if new_value == self.start_value and processed_row_value != self.start_value:
self.start_out_of_range = True
return False
else:
return True
else:
incremental_state["last_value"] = new_value
unique_value = self.unique_value(row)
if unique_value:
incremental_state["unique_hashes"] = [unique_value]
yield row
return

return True
transformer = get_transformer(row)

row, start_out_of_range, end_out_of_range = transformer(
row, self.resource_name, self.cursor_path_p, self.start_value, self.end_value, self._cached_state, self.last_value_func, self.primary_key
)
self.start_out_of_range = start_out_of_range
self.end_out_of_range = end_out_of_range
if row is not None:
yield row

def get_incremental_value_type(self) -> Type[Any]:
"""Infers the type of incremental value from a class of an instance if those preserve the Generic arguments information."""
Expand Down Expand Up @@ -377,7 +313,7 @@ def __str__(self) -> str:
return f"Incremental at {id(self)} for resource {self.resource_name} with cursor path: {self.cursor_path} initial {self.initial_value} lv_func {self.last_value_func}"


class IncrementalResourceWrapper(FilterItem):
class IncrementalResourceWrapper(YieldMapItem):
_incremental: Optional[Incremental[Any]] = None
"""Keeps the injectable incremental"""
_resource_name: str = None
Expand Down Expand Up @@ -485,7 +421,11 @@ def bind(self, pipe: SupportsPipe) -> "IncrementalResourceWrapper":

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
if not self._incremental:
return item
yield item
return
if self._incremental.primary_key is None:
self._incremental.primary_key = self.primary_key
return self._incremental(item, meta)
if isinstance(item, list):
yield list(self._incremental(item, meta))
else:
yield self._incremental(item, meta)
18 changes: 18 additions & 0 deletions dlt/extract/incremental/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dlt.extract.exceptions import PipeException
from dlt.common.typing import TDataItem


class IncrementalCursorPathMissing(PipeException):
def __init__(self, pipe_name: str, json_path: str, item: TDataItem) -> None:
self.json_path = json_path
self.item = item
msg = f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database."
super().__init__(pipe_name, msg)


class IncrementalPrimaryKeyMissing(PipeException):
def __init__(self, pipe_name: str, primary_key_column: str, item: TDataItem) -> None:
self.primary_key_column = primary_key_column
self.item = item
msg = f"Primary key column {primary_key_column} was not found in extracted data item. All data items must contain this column. Use the same names of fields as in your JSON document."
super().__init__(pipe_name, msg)
Loading

0 comments on commit 1845630

Please sign in to comment.