From ead27f8d53520223d322836b8d271848de138405 Mon Sep 17 00:00:00 2001 From: Xiaochuan Wang Date: Sat, 21 Dec 2024 01:45:44 +0800 Subject: [PATCH] bug: support lag for non-UTC datetime cursor fields - Add `ensure_pendulum_datetime_non_utc` to parse datetime strings into non-UTC datetime objects. - Add `_datetime_obj_to_str` to preserve the colon in the timezone when converting datetime objects back to strings. - Skip writing back state if no valid rows are found for `last_value` in the transformer, which may otherwise cause incorrect behavior. --- dlt/common/time.py | 41 ++++++++++++++++++++++++++--- dlt/extract/incremental/__init__.py | 9 ++++++- dlt/extract/incremental/lag.py | 11 +++++--- tests/common/test_time.py | 32 +++++++++++++++++++--- tests/extract/test_incremental.py | 33 +++++++++++++++++++++++ 5 files changed, 114 insertions(+), 12 deletions(-) diff --git a/dlt/common/time.py b/dlt/common/time.py index 74c32e4ea0..065d2697ab 100644 --- a/dlt/common/time.py +++ b/dlt/common/time.py @@ -1,6 +1,7 @@ import contextlib import datetime # noqa: I251 import re +import sys from typing import Any, Optional, Union, overload, TypeVar, Callable # noqa from pendulum.parsing import ( @@ -125,6 +126,38 @@ def ensure_pendulum_datetime(value: TAnyDateTime) -> pendulum.DateTime: raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.") +def ensure_pendulum_datetime_non_utc(value: TAnyDateTime) -> pendulum.DateTime: + if isinstance(value, datetime.datetime): + ret = pendulum.instance(value) + return ret + elif isinstance(value, datetime.date): + return pendulum.datetime(value.year, value.month, value.day) + elif isinstance(value, (int, float, str)): + result = _datetime_from_ts_or_iso(value) + if isinstance(result, datetime.time): + raise ValueError(f"Cannot coerce {value} to a pendulum.DateTime object.") + if isinstance(result, pendulum.DateTime): + return result + return pendulum.datetime(result.year, result.month, result.day) + raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.") + + +def datatime_obj_to_str( + datatime: Union[datetime.datetime, datetime.date], datetime_format: str +) -> str: + if sys.version_info < (3, 12, 0) and "%:z" in datetime_format: + modified_format = datetime_format.replace("%:z", "%z") + datetime_str = datatime.strftime(modified_format) + + timezone_part = datetime_str[-5:] if len(datetime_str) >= 5 else "" + if timezone_part.startswith(("-", "+")): + return f"{datetime_str[:-5]}{timezone_part[:3]}:{timezone_part[3:]}" + + raise ValueError(f"Invalid timezone format in datetime string: {datetime_str}") + + return datatime.strftime(datetime_format) + + def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time: """Coerce a time value to a `pendulum.Time` object. @@ -164,27 +197,27 @@ def detect_datetime_format(value: str) -> Optional[str]: ): "%Y-%m-%dT%H:%M:%S.%fZ", # UTC with fractional seconds re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}$" - ): "%Y-%m-%dT%H:%M:%S%z", # Positive timezone offset + ): "%Y-%m-%dT%H:%M:%S%:z", # Positive timezone offset re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{4}$" ): "%Y-%m-%dT%H:%M:%S%z", # Positive timezone without colon # Full datetime with fractional seconds and positive timezone offset re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2}$" - ): "%Y-%m-%dT%H:%M:%S.%f%z", + ): "%Y-%m-%dT%H:%M:%S.%f%:z", re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{4}$" ): "%Y-%m-%dT%H:%M:%S.%f%z", # Positive timezone without colon re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}-\d{2}:\d{2}$" - ): "%Y-%m-%dT%H:%M:%S%z", # Negative timezone offset + ): "%Y-%m-%dT%H:%M:%S%:z", # Negative timezone offset re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}-\d{4}$" ): "%Y-%m-%dT%H:%M:%S%z", # Negative timezone without colon # Full datetime with fractional seconds and negative timezone offset re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+-\d{2}:\d{2}$" - ): "%Y-%m-%dT%H:%M:%S.%f%z", + ): "%Y-%m-%dT%H:%M:%S.%f%:z", re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+-\d{4}$" ): "%Y-%m-%dT%H:%M:%S.%f%z", # Negative Timezone without colon diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index ce06292864..2b9ea8fbf1 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -1,6 +1,7 @@ import os from datetime import datetime # noqa: I251 from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union, Literal, Tuple + from typing_extensions import get_args import inspect @@ -560,8 +561,14 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: else: rows = self._transform_item(transformer, rows) - # write back state + # ensure last_value maintains forward-only progression when lag is applied + if self.lag and (cached_last_value := self._cached_state.get("last_value")): + transformer.last_value = self.last_value_func( # type: ignore[call-arg] + (transformer.last_value, cached_last_value) + ) + # writing back state self._cached_state["last_value"] = transformer.last_value + if not transformer.deduplication_disabled: # compute hashes for new last rows unique_hashes = set( diff --git a/dlt/extract/incremental/lag.py b/dlt/extract/incremental/lag.py index dfafa2cd11..945915c3a8 100644 --- a/dlt/extract/incremental/lag.py +++ b/dlt/extract/incremental/lag.py @@ -1,8 +1,13 @@ +import sys from datetime import datetime, timedelta, date # noqa: I251 from typing import Union from dlt.common import logger -from dlt.common.time import ensure_pendulum_datetime, detect_datetime_format +from dlt.common.time import ( + detect_datetime_format, + ensure_pendulum_datetime_non_utc, + datatime_obj_to_str, +) from . import TCursorValue, LastValueFunc @@ -17,12 +22,12 @@ def _apply_lag_to_value( is_str = isinstance(value, str) value_format = detect_datetime_format(value) if is_str else None is_str_date = value_format in ("%Y%m%d", "%Y-%m-%d") if value_format else None - parsed_value = ensure_pendulum_datetime(value) if is_str else value + parsed_value = ensure_pendulum_datetime_non_utc(value) if is_str else value if isinstance(parsed_value, (datetime, date)): parsed_value = _apply_lag_to_datetime(lag, parsed_value, last_value_func, is_str_date) # type: ignore[assignment] # go back to string or pass exact type - value = parsed_value.strftime(value_format) if value_format else parsed_value # type: ignore[assignment] + value = datatime_obj_to_str(parsed_value, value_format) if value_format else parsed_value # type: ignore[assignment] elif isinstance(parsed_value, (int, float)): value = _apply_lag_to_number(lag, parsed_value, last_value_func) # type: ignore[assignment] diff --git a/tests/common/test_time.py b/tests/common/test_time.py index 9c7a1567e2..9b643f351e 100644 --- a/tests/common/test_time.py +++ b/tests/common/test_time.py @@ -12,8 +12,10 @@ datetime_to_timestamp, datetime_to_timestamp_ms, detect_datetime_format, + ensure_pendulum_datetime_non_utc, ) from dlt.common.typing import TAnyDateTime +from dlt.common.time import datatime_obj_to_str def test_timestamp_within() -> None: @@ -132,21 +134,21 @@ def test_datetime_to_timestamp_helpers( [ ("2024-10-20T15:30:00Z", "%Y-%m-%dT%H:%M:%SZ"), # UTC 'Z' ("2024-10-20T15:30:00.123456Z", "%Y-%m-%dT%H:%M:%S.%fZ"), # UTC 'Z' with fractional seconds - ("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%z"), # Positive timezone offset + ("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%:z"), # Positive timezone offset ("2024-10-20T15:30:00+0200", "%Y-%m-%dT%H:%M:%S%z"), # Positive timezone offset (no colon) ( "2024-10-20T15:30:00.123456+02:00", - "%Y-%m-%dT%H:%M:%S.%f%z", + "%Y-%m-%dT%H:%M:%S.%f%:z", ), # Positive timezone offset with fractional seconds ( "2024-10-20T15:30:00.123456+0200", "%Y-%m-%dT%H:%M:%S.%f%z", ), # Positive timezone offset with fractional seconds (no colon) - ("2024-10-20T15:30:00-02:00", "%Y-%m-%dT%H:%M:%S%z"), # Negative timezone offset + ("2024-10-20T15:30:00-02:00", "%Y-%m-%dT%H:%M:%S%:z"), # Negative timezone offset ("2024-10-20T15:30:00-0200", "%Y-%m-%dT%H:%M:%S%z"), # Negative timezone offset (no colon) ( "2024-10-20T15:30:00.123456-02:00", - "%Y-%m-%dT%H:%M:%S.%f%z", + "%Y-%m-%dT%H:%M:%S.%f%:z", ), # Negative timezone offset with fractional seconds ( "2024-10-20T15:30:00.123456-0200", @@ -170,6 +172,28 @@ def test_detect_datetime_format(value, expected_format) -> None: assert ensure_pendulum_datetime(value) is not None +@pytest.mark.parametrize( + "datetime_str, datetime_format, expected_value", + [ + ("2024-10-20T15:30:00+02:00", "%Y-%m-%dT%H:%M:%S%:z", "2024-10-20T15:30:00+02:00"), + ("2024-10-20T15:30:00+0200", "%Y-%m-%dT%H:%M:%S%z", "2024-10-20T15:30:00+0200"), + ( + "2024-10-20T15:30:00.123456-02:00", + "%Y-%m-%dT%H:%M:%S.%f%:z", + "2024-10-20T15:30:00.123456-02:00", + ), + ( + "2024-10-20T15:30:00.123456-0200", + "%Y-%m-%dT%H:%M:%S.%f%z", + "2024-10-20T15:30:00.123456-0200", + ), + ], +) +def test_datatime_obj_to_str(datetime_str, datetime_format, expected_value) -> None: + datetime = ensure_pendulum_datetime_non_utc(datetime_str) + assert datatime_obj_to_str(datetime, datetime_format) == expected_value + + @pytest.mark.parametrize( "value", [ diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 9ad7d28e88..139eb5f466 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -606,6 +606,39 @@ def some_data(created_at=dlt.sources.incremental("created_at", initial_value)): assert s["last_value"] == initial_value + timedelta(minutes=4) +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_incremental_transform_return_empty_rows_with_lag(item_type: TestDataItemFormat) -> None: + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "created_at", initial_value="2024-11-01T08:00:00+08:00", lag=3600 + ) + ): + yield from source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + + first_run_data = [{"id": 1, "value": 10, "created_at": "2024-11-01T12:00:00+08:00"}] + source_items = data_to_item_format(item_type, first_run_data) + + p.extract(some_data()) + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + + assert s["last_value"] == "2024-11-01T12:00:00+08:00" + + second_run_data = [{"id": 1, "value": 10, "created_at": "2024-11-01T10:00:00+08:00"}] + source_items = data_to_item_format(item_type, second_run_data) + + p.extract(some_data()) + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + + assert s["last_value"] == "2024-11-01T12:00:00+08:00" + + @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_descending_order_unique_hashes(item_type: TestDataItemFormat) -> None: """Resource returns items in descending order but using `max` last value function.