Skip to content

Commit

Permalink
code formatting & docs update
Browse files Browse the repository at this point in the history
  • Loading branch information
willi-mueller committed Aug 2, 2024
1 parent 2eb2928 commit c3e63e8
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 23 deletions.
4 changes: 3 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def __init__(
self.row_order = row_order
self.allow_external_schedulers = allow_external_schedulers
if on_cursor_value_missing not in ["raise", "include", "exclude"]:
raise ValueError(f"Unexpected argument for on_cursor_value_none. Got {on_cursor_value_missing}")
raise ValueError(
f"Unexpected argument for on_cursor_value_missing. Got {on_cursor_value_missing}"
)
self.on_cursor_value_missing = on_cursor_value_missing

self._cached_state: IncrementalColumnState = None
Expand Down
13 changes: 9 additions & 4 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def find_cursor_value(self, row: TDataItem) -> Any:
return None
elif None in cursor_values:
if self.on_cursor_value_missing == "raise":
raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row)
raise IncrementalCursorPathHasValueNone(
self.resource_name, self.cursor_path, row
)
elif self.on_cursor_value_missing == "exclude":
return None

Expand All @@ -140,15 +142,18 @@ def find_cursor_value(self, row: TDataItem) -> Any:
if row_value is None:
if self.on_cursor_value_missing == "raise":
if self.cursor_path not in row.keys():
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)
raise IncrementalCursorPathMissing(
self.resource_name, self.cursor_path, row
)
else:
raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row)
raise IncrementalCursorPathHasValueNone(
self.resource_name, self.cursor_path, row
)
elif self.on_cursor_value_missing == "exclude":
return None

return row_value


def __call__(
self,
row: TDataItem,
Expand Down
44 changes: 34 additions & 10 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -878,34 +878,58 @@ Consider the example below for reading incremental loading parameters from "conf
```
`id_after` incrementally stores the latest `cursor_path` value for future pipeline runs.

### Loading NULL values in the incremental cursor field
### Loading when incremental cursor path is missing or value is None/NULL

You can customize the incremental processing of dlt by setting the parameter `on_cursor_value_missing`.

When loading incrementally there are two assumptions:
1. each row contains the cursor path
2. each row is expected to contain a value at the cursor path that is not `None`.

When loading incrementally with a cursor field, each row is expected to contain a value at the cursor field that is not `None`.
For example, the following source data will raise an error:
```py
@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at")):
def some_data_without_cursor_path(updated_at=dlt.sources.incremental("updated_at")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2, "updated_at": 2},
{"id": 3, "created_at": 4, "updated_at": None},
{"id": 2, "created_at": 2}, # cursor field is missing
]

list(some_data())
list(some_data_without_cursor_path())

@dlt.resource
def some_data_without_cursor_value(updated_at=dlt.sources.incremental("updated_at")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 3, "created_at": 4, "updated_at": None}, # value at cursor field is None
]

list(some_data_without_cursor_value())
```

If you want to load data that includes `None` values there are two options:
If you want to load data that includes rows without the cursor path or `None` values there are two options:

1. Transform the values at the incremental cursor to a value different from `None` before the incremental object is called. [See docs below](#transform-records-before-incremental-processing)
2. Configure the incremental load to tolerate `None` values using `incremental(..., on_cursor_value_none="include")`.
1. Before the incremental processing begins: Ensure that the incremental field is present and transform the values at the incremental cursor to a value different from `None` . [See docs below](#transform-records-before-incremental-processing)
2. Configure the incremental load to tolerate the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="include")`.

Example:
```py
@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="include")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2},
{"id": 3, "created_at": 4, "updated_at": None},
]

list(some_data())
result = list(some_data())
assert len(result) == 3
assert result[1] == {"id": 2, "created_at": 2}
assert result[2] == {"id": 3, "created_at": 4, "updated_at": None}
```

Similarly, when the cursor path

### Transform records before incremental processing
If you want to load data that includes `None` values you can transform the records before the incremental processing.
You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data).
Expand Down
35 changes: 27 additions & 8 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,9 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_1(
source_items = data_to_item_format(item_type, data)

@dlt.resource
def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")):
def some_data(
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")
):
yield source_items

p = dlt.pipeline(pipeline_name=uniq_id())
Expand All @@ -678,7 +680,9 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2(
source_items = data_to_item_format(item_type, data)

@dlt.resource
def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")):
def some_data(
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")
):
yield source_items

p = dlt.pipeline(pipeline_name=uniq_id())
Expand All @@ -704,7 +708,9 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_3(
source_items = data_to_item_format(item_type, data)

@dlt.resource
def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")):
def some_data(
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")
):
yield source_items

p = dlt.pipeline(pipeline_name=uniq_id())
Expand All @@ -716,6 +722,7 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_m
]
assert s["last_value"] == 2


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_cursor_path_none_includes_records_without_cursor_path(
item_type: TestDataItemFormat,
Expand All @@ -727,7 +734,9 @@ def test_cursor_path_none_includes_records_without_cursor_path(
source_items = data_to_item_format(item_type, data)

@dlt.resource
def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")):
def some_data(
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")
):
yield source_items

p = dlt.pipeline(pipeline_name=uniq_id())
Expand All @@ -739,6 +748,7 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_m
]
assert s["last_value"] == 1


@pytest.mark.parametrize("item_type", ["object"])
def test_cursor_path_none_excludes_records_and_updates_incremental_cursor(
item_type: TestDataItemFormat,
Expand All @@ -751,7 +761,9 @@ def test_cursor_path_none_excludes_records_and_updates_incremental_cursor(
source_items = data_to_item_format(item_type, data)

@dlt.resource
def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="exclude")):
def some_data(
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="exclude")
):
yield source_items

p = dlt.pipeline(pipeline_name=uniq_id())
Expand All @@ -773,7 +785,9 @@ def test_cursor_path_none_can_raise_on_none() -> None:
]

@dlt.resource
def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise")):
def some_data(
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise")
):
yield source_items

with pytest.raises(IncrementalCursorPathHasValueNone) as py_ex:
Expand All @@ -793,7 +807,9 @@ def test_cursor_path_none_nested_can_raise_on_none_1() -> None:
# No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails
@dlt.resource
def some_data(
created_at=dlt.sources.incremental("data.items[0].created_at", on_cursor_value_missing="raise")
created_at=dlt.sources.incremental(
"data.items[0].created_at", on_cursor_value_missing="raise"
)
):
yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}}

Expand All @@ -806,7 +822,9 @@ def test_cursor_path_none_nested_can_raise_on_none_2() -> None:
# No pandas and arrow. See test_nested_cursor_path_arrow_fails
@dlt.resource
def some_data(
created_at=dlt.sources.incremental("data.items[*].created_at", on_cursor_value_missing="raise")
created_at=dlt.sources.incremental(
"data.items[*].created_at", on_cursor_value_missing="raise"
)
):
yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}}

Expand Down Expand Up @@ -901,6 +919,7 @@ def some_data(

assert_query_data(p, "select count(*) from some_data__data__items", [2])


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_set_default_value_for_incremental_cursor(item_type: TestDataItemFormat) -> None:
@dlt.resource
Expand Down

0 comments on commit c3e63e8

Please sign in to comment.