Skip to content

Commit

Permalink
bug: support lag for non-UTC datetime cursor fields
Browse files Browse the repository at this point in the history
- Add `ensure_pendulum_datetime_non_utc` to parse datetime strings into non-UTC datetime objects.
- Add `_datetime_obj_to_str` to preserve the colon in the timezone when converting datetime objects back to strings.
- Skip writing back state if no valid rows are found for `last_value` in the transformer, which may otherwise cause incorrect behavior.
  • Loading branch information
hairrrrr committed Dec 23, 2024
1 parent 5a1cb69 commit ead27f8
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 12 deletions.
41 changes: 37 additions & 4 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import datetime # noqa: I251
import re
import sys
from typing import Any, Optional, Union, overload, TypeVar, Callable # noqa

from pendulum.parsing import (
Expand Down Expand Up @@ -125,6 +126,38 @@ def ensure_pendulum_datetime(value: TAnyDateTime) -> pendulum.DateTime:
raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.")


def ensure_pendulum_datetime_non_utc(value: TAnyDateTime) -> pendulum.DateTime:
if isinstance(value, datetime.datetime):
ret = pendulum.instance(value)
return ret
elif isinstance(value, datetime.date):
return pendulum.datetime(value.year, value.month, value.day)
elif isinstance(value, (int, float, str)):
result = _datetime_from_ts_or_iso(value)
if isinstance(result, datetime.time):
raise ValueError(f"Cannot coerce {value} to a pendulum.DateTime object.")
if isinstance(result, pendulum.DateTime):
return result
return pendulum.datetime(result.year, result.month, result.day)
raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.")


def datatime_obj_to_str(
datatime: Union[datetime.datetime, datetime.date], datetime_format: str
) -> str:
if sys.version_info < (3, 12, 0) and "%:z" in datetime_format:
modified_format = datetime_format.replace("%:z", "%z")
datetime_str = datatime.strftime(modified_format)

timezone_part = datetime_str[-5:] if len(datetime_str) >= 5 else ""
if timezone_part.startswith(("-", "+")):
return f"{datetime_str[:-5]}{timezone_part[:3]}:{timezone_part[3:]}"

raise ValueError(f"Invalid timezone format in datetime string: {datetime_str}")

return datatime.strftime(datetime_format)


def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time:
"""Coerce a time value to a `pendulum.Time` object.
Expand Down Expand Up @@ -164,27 +197,27 @@ def detect_datetime_format(value: str) -> Optional[str]:
): "%Y-%m-%dT%H:%M:%S.%fZ", # UTC with fractional seconds
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S%z", # Positive timezone offset
): "%Y-%m-%dT%H:%M:%S%:z", # Positive timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{4}$"
): "%Y-%m-%dT%H:%M:%S%z", # Positive timezone without colon
# Full datetime with fractional seconds and positive timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S.%f%z",
): "%Y-%m-%dT%H:%M:%S.%f%:z",
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{4}$"
): "%Y-%m-%dT%H:%M:%S.%f%z", # Positive timezone without colon
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}-\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S%z", # Negative timezone offset
): "%Y-%m-%dT%H:%M:%S%:z", # Negative timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}-\d{4}$"
): "%Y-%m-%dT%H:%M:%S%z", # Negative timezone without colon
# Full datetime with fractional seconds and negative timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+-\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S.%f%z",
): "%Y-%m-%dT%H:%M:%S.%f%:z",
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+-\d{4}$"
): "%Y-%m-%dT%H:%M:%S.%f%z", # Negative Timezone without colon
Expand Down
9 changes: 8 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from datetime import datetime # noqa: I251
from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union, Literal, Tuple

from typing_extensions import get_args

import inspect
Expand Down Expand Up @@ -560,8 +561,14 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
else:
rows = self._transform_item(transformer, rows)

# write back state
# ensure last_value maintains forward-only progression when lag is applied
if self.lag and (cached_last_value := self._cached_state.get("last_value")):
transformer.last_value = self.last_value_func( # type: ignore[call-arg]
(transformer.last_value, cached_last_value)
)
# writing back state
self._cached_state["last_value"] = transformer.last_value

if not transformer.deduplication_disabled:
# compute hashes for new last rows
unique_hashes = set(
Expand Down
11 changes: 8 additions & 3 deletions dlt/extract/incremental/lag.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import sys
from datetime import datetime, timedelta, date # noqa: I251
from typing import Union

from dlt.common import logger
from dlt.common.time import ensure_pendulum_datetime, detect_datetime_format
from dlt.common.time import (
detect_datetime_format,
ensure_pendulum_datetime_non_utc,
datatime_obj_to_str,
)

from . import TCursorValue, LastValueFunc

Expand All @@ -17,12 +22,12 @@ def _apply_lag_to_value(
is_str = isinstance(value, str)
value_format = detect_datetime_format(value) if is_str else None
is_str_date = value_format in ("%Y%m%d", "%Y-%m-%d") if value_format else None
parsed_value = ensure_pendulum_datetime(value) if is_str else value
parsed_value = ensure_pendulum_datetime_non_utc(value) if is_str else value

if isinstance(parsed_value, (datetime, date)):
parsed_value = _apply_lag_to_datetime(lag, parsed_value, last_value_func, is_str_date) # type: ignore[assignment]
# go back to string or pass exact type
value = parsed_value.strftime(value_format) if value_format else parsed_value # type: ignore[assignment]
value = datatime_obj_to_str(parsed_value, value_format) if value_format else parsed_value # type: ignore[assignment]

elif isinstance(parsed_value, (int, float)):
value = _apply_lag_to_number(lag, parsed_value, last_value_func) # type: ignore[assignment]
Expand Down
32 changes: 28 additions & 4 deletions tests/common/test_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
datetime_to_timestamp,
datetime_to_timestamp_ms,
detect_datetime_format,
ensure_pendulum_datetime_non_utc,
)
from dlt.common.typing import TAnyDateTime
from dlt.common.time import datatime_obj_to_str


def test_timestamp_within() -> None:
Expand Down Expand Up @@ -132,21 +134,21 @@ def test_datetime_to_timestamp_helpers(
[
("2024-10-20T15:30:00Z", "%Y-%m-%dT%H:%M:%SZ"), # UTC 'Z'
("2024-10-20T15:30:00.123456Z", "%Y-%m-%dT%H:%M:%S.%fZ"), # UTC 'Z' with fractional seconds
("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%z"), # Positive timezone offset
("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%:z"), # Positive timezone offset
("2024-10-20T15:30:00+0200", "%Y-%m-%dT%H:%M:%S%z"), # Positive timezone offset (no colon)
(
"2024-10-20T15:30:00.123456+02:00",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S.%f%:z",
), # Positive timezone offset with fractional seconds
(
"2024-10-20T15:30:00.123456+0200",
"%Y-%m-%dT%H:%M:%S.%f%z",
), # Positive timezone offset with fractional seconds (no colon)
("2024-10-20T15:30:00-02:00", "%Y-%m-%dT%H:%M:%S%z"), # Negative timezone offset
("2024-10-20T15:30:00-02:00", "%Y-%m-%dT%H:%M:%S%:z"), # Negative timezone offset
("2024-10-20T15:30:00-0200", "%Y-%m-%dT%H:%M:%S%z"), # Negative timezone offset (no colon)
(
"2024-10-20T15:30:00.123456-02:00",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S.%f%:z",
), # Negative timezone offset with fractional seconds
(
"2024-10-20T15:30:00.123456-0200",
Expand All @@ -170,6 +172,28 @@ def test_detect_datetime_format(value, expected_format) -> None:
assert ensure_pendulum_datetime(value) is not None


@pytest.mark.parametrize(
"datetime_str, datetime_format, expected_value",
[
("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%:z", "2024-10-20T15:30:00+02:00"),
("2024-10-20T15:30:00+0200", "%Y-%m-%dT%H:%M:%S%z", "2024-10-20T15:30:00+0200"),
(
"2024-10-20T15:30:00.123456-02:00",
"%Y-%m-%dT%H:%M:%S.%f%:z",
"2024-10-20T15:30:00.123456-02:00",
),
(
"2024-10-20T15:30:00.123456-0200",
"%Y-%m-%dT%H:%M:%S.%f%z",
"2024-10-20T15:30:00.123456-0200",
),
],
)
def test_datatime_obj_to_str(datetime_str, datetime_format, expected_value) -> None:
datetime = ensure_pendulum_datetime_non_utc(datetime_str)
assert datatime_obj_to_str(datetime, datetime_format) == expected_value


@pytest.mark.parametrize(
"value",
[
Expand Down
33 changes: 33 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,39 @@ def some_data(created_at=dlt.sources.incremental("created_at", initial_value)):
assert s["last_value"] == initial_value + timedelta(minutes=4)


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_incremental_transform_return_empty_rows_with_lag(item_type: TestDataItemFormat) -> None:
@dlt.resource
def some_data(
created_at=dlt.sources.incremental(
"created_at", initial_value="2024-11-01T08:00:00+08:00", lag=3600
)
):
yield from source_items

p = dlt.pipeline(pipeline_name=uniq_id())

first_run_data = [{"id": 1, "value": 10, "created_at": "2024-11-01T12:00:00+08:00"}]
source_items = data_to_item_format(item_type, first_run_data)

p.extract(some_data())
s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][
"created_at"
]

assert s["last_value"] == "2024-11-01T12:00:00+08:00"

second_run_data = [{"id": 1, "value": 10, "created_at": "2024-11-01T10:00:00+08:00"}]
source_items = data_to_item_format(item_type, second_run_data)

p.extract(some_data())
s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][
"created_at"
]

assert s["last_value"] == "2024-11-01T12:00:00+08:00"


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_descending_order_unique_hashes(item_type: TestDataItemFormat) -> None:
"""Resource returns items in descending order but using `max` last value function.
Expand Down

0 comments on commit ead27f8

Please sign in to comment.