From 14d28a84fdc6a48d6da3e5e0c92d26b317b69adb Mon Sep 17 00:00:00 2001 From: rudolfix Date: Tue, 31 Oct 2023 17:18:53 +0100 Subject: [PATCH] arrow as data (#723) * moves pandas helper to libs * wrapps arrow type instances in list * allows to remove incremental with explicit none, uses column search for simple jsonpath * does not check resource binding when end_value provided * updates arrow docs * fixes aws credentials test * fixes wrappers type tests * bumps version to 0.3.23 --- dlt/common/jsonpath.py | 2 +- dlt/common/libs/pandas.py | 7 +++ dlt/destinations/sql_client.py | 2 +- dlt/extract/decorators.py | 2 +- dlt/extract/exceptions.py | 2 +- dlt/extract/incremental/__init__.py | 33 ++++++----- dlt/extract/incremental/transform.py | 49 +++++++++++----- dlt/extract/source.py | 6 +- dlt/extract/utils.py | 6 +- dlt/extract/wrappers.py | 25 +++++++++ dlt/helpers/pandas_helper.py | 56 ------------------- dlt/helpers/streamlit_helper.py | 2 +- .../verified-sources/arrow-pandas.md | 19 ++++++- pyproject.toml | 2 +- tests/extract/test_incremental.py | 52 ++++++++++++++++- tests/load/filesystem/test_aws_credentials.py | 9 +-- tests/pipeline/test_arrow_sources.py | 18 ++++++ 17 files changed, 187 insertions(+), 105 deletions(-) create mode 100644 dlt/common/libs/pandas.py create mode 100644 dlt/extract/wrappers.py delete mode 100644 dlt/helpers/pandas_helper.py diff --git a/dlt/common/jsonpath.py b/dlt/common/jsonpath.py index f5922d5d16..7808d1c69c 100644 --- a/dlt/common/jsonpath.py +++ b/dlt/common/jsonpath.py @@ -3,7 +3,7 @@ from dlt.common.typing import DictStrAny -from jsonpath_ng import parse as _parse, JSONPath +from jsonpath_ng import parse as _parse, JSONPath, Fields as JSONPathFields TJsonPath = Union[str, JSONPath] # Jsonpath compiled or str diff --git a/dlt/common/libs/pandas.py b/dlt/common/libs/pandas.py new file mode 100644 index 0000000000..93e6b764bc --- /dev/null +++ b/dlt/common/libs/pandas.py @@ -0,0 +1,7 @@ +from dlt.common.exceptions import MissingDependencyException + +try: + import pandas + from pandas.io.sql import _wrap_result +except ModuleNotFoundError: + raise MissingDependencyException("DLT Pandas Helpers", ["pandas"]) diff --git a/dlt/destinations/sql_client.py b/dlt/destinations/sql_client.py index 68fb39af09..68af420085 100644 --- a/dlt/destinations/sql_client.py +++ b/dlt/destinations/sql_client.py @@ -172,7 +172,7 @@ def _get_columns(self) -> List[str]: return [c[0] for c in self.native_cursor.description] def df(self, chunk_size: int = None, **kwargs: Any) -> Optional[DataFrame]: - from dlt.helpers.pandas_helper import _wrap_result + from dlt.common.libs.pandas import _wrap_result columns = self._get_columns() if chunk_size is None: diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index eb8a08e3d1..dbc5f2fa82 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -494,7 +494,7 @@ def transformer( selected: bool = True, spec: Type[BaseConfiguration] = None, standalone: Literal[True] = True -) -> Callable[TResourceFunParams, DltResource]: # TODO: change back to Callable[TResourceFunParams, DltResource] when mypy 1.6 is fixed +) -> Callable[TResourceFunParams, DltResource]: ... def transformer( diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index 85b0064160..e540a2468f 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -102,7 +102,7 @@ class InvalidResourceDataType(DltResourceException): def __init__(self, resource_name: str, item: Any, _typ: Type[Any], msg: str) -> None: self.item = item self._typ = _typ - super().__init__(resource_name, f"Cannot create resource {resource_name} from specified data. " + msg) + super().__init__(resource_name, f"Cannot create resource {resource_name} from specified data. If you want to process just one data item, enclose it in a list. " + msg) class InvalidResourceDataTypeAsync(InvalidResourceDataType): diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 57593769f7..652adc19d2 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -28,7 +28,7 @@ from dlt.extract.pipe import Pipe from dlt.extract.utils import resolve_column_value from dlt.extract.typing import SupportsPipe, TTableHintTemplate, MapItem, YieldMapItem, FilterItem, ItemTransform -from dlt.extract.incremental.transform import JsonIncremental, ArrowIncremental, IncrementalTransformer +from dlt.extract.incremental.transform import JsonIncremental, ArrowIncremental, IncrementalTransform try: from dlt.common.libs.pyarrow import is_arrow_item, pyarrow as pa, TAnyArrowItem except MissingDependencyException: @@ -87,9 +87,10 @@ def __init__( end_value: Optional[TCursorValue] = None, allow_external_schedulers: bool = False ) -> None: + # make sure that path is valid + if cursor_path: + compile_path(cursor_path) self.cursor_path = cursor_path - if self.cursor_path: - self.cursor_path_p: JSONPath = compile_path(cursor_path) self.last_value_func = last_value_func self.initial_value = initial_value """Initial value of last_value""" @@ -109,14 +110,14 @@ def __init__( self.start_out_of_range: bool = False """Becomes true on the first item that is out of range of `start_value`. I.e. when using `max` this is a value that is lower than `start_value`""" - self._transformers: Dict[str, IncrementalTransformer] = {} + self._transformers: Dict[str, IncrementalTransform] = {} - def _make_transformers(self) -> None: + def _make_transforms(self) -> None: types = [("arrow", ArrowIncremental), ("json", JsonIncremental)] for dt, kls in types: self._transformers[dt] = kls( self.resource_name, - self.cursor_path_p, + self.cursor_path, self.start_value, self.end_value, self._cached_state, @@ -170,7 +171,7 @@ def merge(self, other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue return constructor(**kwargs) # type: ignore def on_resolved(self) -> None: - self.cursor_path_p = compile_path(self.cursor_path) + compile_path(self.cursor_path) if self.end_value is not None and self.initial_value is None: raise ConfigurationValueError( "Incremental 'end_value' was specified without 'initial_value'. 'initial_value' is required when using 'end_value'." @@ -193,7 +194,6 @@ def parse_native_representation(self, native_value: Any) -> None: self.initial_value = native_value.initial_value self.last_value_func = native_value.last_value_func self.end_value = native_value.end_value - self.cursor_path_p = self.cursor_path_p self.resource_name = self.resource_name else: # TODO: Maybe check if callable(getattr(native_value, '__lt__', None)) # Passing bare value `incremental=44` gets parsed as initial_value @@ -203,9 +203,6 @@ def parse_native_representation(self, native_value: Any) -> None: def get_state(self) -> IncrementalColumnState: """Returns an Incremental state for a particular cursor column""" - if not self.resource_name: - raise IncrementalUnboundError(self.cursor_path) - if self.end_value is not None: # End value uses mock state. We don't want to write it. return { @@ -214,6 +211,9 @@ def get_state(self) -> IncrementalColumnState: 'unique_hashes': [] } + if not self.resource_name: + raise IncrementalUnboundError(self.cursor_path) + self._cached_state = Incremental._get_state(self.resource_name, self.cursor_path) if len(self._cached_state) == 0: # set the default like this, setdefault evaluates the default no matter if it is needed or not. and our default is heavy @@ -237,7 +237,7 @@ def last_value(self) -> Optional[TCursorValue]: s = self.get_state() return s['last_value'] # type: ignore - def _transform_item(self, transformer: IncrementalTransformer, row: TDataItem) -> Optional[TDataItem]: + def _transform_item(self, transformer: IncrementalTransform, row: TDataItem) -> Optional[TDataItem]: row, start_out_of_range, end_out_of_range = transformer(row) self.start_out_of_range = start_out_of_range self.end_out_of_range = end_out_of_range @@ -313,13 +313,13 @@ def bind(self, pipe: SupportsPipe) -> "Incremental[TCursorValue]": logger.info(f"Bind incremental on {self.resource_name} with initial_value: {self.initial_value}, start_value: {self.start_value}, end_value: {self.end_value}") # cache state self._cached_state = self.get_state() - self._make_transformers() + self._make_transforms() return self def __str__(self) -> str: return f"Incremental at {id(self)} for resource {self.resource_name} with cursor path: {self.cursor_path} initial {self.initial_value} lv_func {self.last_value_func}" - def _get_transformer(self, items: TDataItems) -> IncrementalTransformer: + def _get_transformer(self, items: TDataItems) -> IncrementalTransform: # Assume list is all of the same type for item in items if isinstance(items, list) else [items]: if is_arrow_item(item): @@ -397,6 +397,9 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: new_incremental = p.default.merge(explicit_value) else: new_incremental = explicit_value.copy() + elif explicit_value is None: + # new_incremental not set! + pass elif isinstance(p.default, Incremental): # Passing only initial value explicitly updates the default instance new_incremental = p.default.copy() @@ -408,7 +411,7 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: if is_optional_type(p.annotation): bound_args.arguments[p.name] = None # Remove partial spec return func(*bound_args.args, **bound_args.kwargs) - raise ValueError(f"{p.name} Incremental has no default") + raise ValueError(f"{p.name} Incremental argument has no default. Please wrap its typing in Optional[] to allow no incremental") # pass Generic information from annotation to new_incremental if not hasattr(new_incremental, "__orig_class__") and p.annotation and get_args(p.annotation): new_incremental.__orig_class__ = p.annotation # type: ignore diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index b1dffe6b28..af45736da4 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -1,5 +1,5 @@ from datetime import datetime, date # noqa: I251 -from typing import Optional, Tuple, List +from typing import Any, Optional, Tuple, List try: import pandas as pd @@ -16,7 +16,7 @@ from dlt.common.json import json from dlt.common import pendulum from dlt.common.typing import TDataItem, TDataItems -from dlt.common.jsonpath import TJsonPath, find_values +from dlt.common.jsonpath import TJsonPath, find_values, JSONPathFields, compile_path from dlt.extract.incremental.exceptions import IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc from dlt.extract.utils import resolve_column_value @@ -28,12 +28,11 @@ pa = None - -class IncrementalTransformer: +class IncrementalTransform: def __init__( self, resource_name: str, - cursor_path: TJsonPath, + cursor_path: str, start_value: Optional[TCursorValue], end_value: Optional[TCursorValue], incremental_state: IncrementalColumnState, @@ -48,6 +47,13 @@ def __init__( self.last_value_func = last_value_func self.primary_key = primary_key + # compile jsonpath + self._compiled_cursor_path = compile_path(cursor_path) + # for simple column name we'll fallback to search in dict + if isinstance(self._compiled_cursor_path, JSONPathFields) and len(self._compiled_cursor_path.fields) == 1 and self._compiled_cursor_path.fields[0] != "*": + self.cursor_path = self._compiled_cursor_path.fields[0] + self._compiled_cursor_path = None + def __call__( self, row: TDataItem, @@ -55,7 +61,8 @@ def __call__( ... -class JsonIncremental(IncrementalTransformer): +class JsonIncremental(IncrementalTransform): + def unique_value( self, row: TDataItem, @@ -72,6 +79,25 @@ def unique_value( except KeyError as k_err: raise IncrementalPrimaryKeyMissing(resource_name, k_err.args[0], row) + def find_cursor_value(self, row: TDataItem) -> Any: + """Finds value in row at cursor defined by self.cursor_path. + + Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict + """ + row_value: Any = None + if self._compiled_cursor_path: + row_values = find_values(self._compiled_cursor_path, row) + if row_values: + row_value = row_values[0] + else: + try: + row_value = row[self.cursor_path] + except Exception: + pass + if row_value is None: + raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + return row_value + def __call__( self, row: TDataItem, @@ -84,10 +110,7 @@ def __call__( if row is None: return row, start_out_of_range, end_out_of_range - row_values = find_values(self.cursor_path, row) - if not row_values: - raise IncrementalCursorPathMissing(self.resource_name, str(self.cursor_path), row) - row_value = row_values[0] + row_value = self.find_cursor_value(row) # For datetime cursor, ensure the value is a timezone aware datetime. # The object saved in state will always be a tz aware pendulum datetime so this ensures values are comparable @@ -137,7 +160,7 @@ def __call__( return row, start_out_of_range, end_out_of_range -class ArrowIncremental(IncrementalTransformer): +class ArrowIncremental(IncrementalTransform): _dlt_index = "_dlt_index" def unique_values( @@ -229,7 +252,7 @@ def __call__( # TODO: Json path support. For now assume the cursor_path is a column name - cursor_path = str(self.cursor_path) + cursor_path = self.cursor_path # The new max/min value try: orig_row_value = compute(tbl[cursor_path]) @@ -242,7 +265,7 @@ def __call__( except KeyError as e: raise IncrementalCursorPathMissing( self.resource_name, cursor_path, tbl, - f"Column name {str(cursor_path)} was not found in the arrow table. Note nested JSON paths are not supported for arrow tables and dataframes, the incremental cursor_path must be a column name." + f"Column name {cursor_path} was not found in the arrow table. Not nested JSON paths are not supported for arrow tables and dataframes, the incremental cursor_path must be a column name." ) from e # If end_value is provided, filter to include table rows that are "less" than end_value diff --git a/dlt/extract/source.py b/dlt/extract/source.py index 1a895fc089..d36cb4b121 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -26,6 +26,7 @@ InvalidTransformerDataTypeGeneratorFunctionRequired, InvalidParentResourceDataType, InvalidParentResourceIsAFunction, InvalidResourceDataType, InvalidResourceDataTypeIsNone, InvalidTransformerGeneratorFunction, DataItemRequiredForDynamicTableHints, InvalidResourceDataTypeAsync, InvalidResourceDataTypeBasic, InvalidResourceDataTypeMultiplePipes, ParametrizedResourceUnbound, ResourceNameMissing, ResourceNotATransformer, ResourcesNotFoundError, DeletingResourcesNotSupported) +from dlt.extract.wrappers import wrap_additional_type def with_table_name(item: TDataItems, table_name: str) -> DataItemWithMeta: @@ -91,6 +92,9 @@ def from_data( if not name: raise ResourceNameMissing() + # wrap additional types + data = wrap_additional_type(data) + # several iterable types are not allowed and must be excluded right away if isinstance(data, (AsyncIterator, AsyncIterable)): raise InvalidResourceDataTypeAsync(name, data, type(data)) @@ -109,7 +113,7 @@ def from_data( return cls(pipe, table_schema_template, selected, incremental=incremental, section=section, args_bound=not callable(data)) else: # some other data type that is not supported - raise InvalidResourceDataType(name, data, type(data), f"The data type is {type(data).__name__}") + raise InvalidResourceDataType(name, data, type(data), f"The data type of supplied type is {type(data).__name__}") @property def name(self) -> str: diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index bdb726c92a..3bd9f56a74 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -124,14 +124,10 @@ def _tx_partial(item: TDataItems, meta: Any = None) -> Any: def wrap_resource_gen(name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any) -> AnyFun: """Wraps a generator or generator function so it is evaluated on extraction""" if inspect.isgeneratorfunction(inspect.unwrap(f)) or inspect.isgenerator(f): - # if no arguments then no wrap - # if len(sig.parameters) == 0: - # return f - # always wrap generators and generator functions. evaluate only at runtime! def _partial() -> Any: - # print(f"_PARTIAL: {args} {kwargs} vs {args_}{kwargs_}") + # print(f"_PARTIAL: {args} {kwargs}") return f(*args, **kwargs) # this partial preserves the original signature and just defers the call to pipe diff --git a/dlt/extract/wrappers.py b/dlt/extract/wrappers.py new file mode 100644 index 0000000000..e8e295f245 --- /dev/null +++ b/dlt/extract/wrappers.py @@ -0,0 +1,25 @@ +from typing import Any + +from dlt.common.typing import NoneType +from dlt.common.exceptions import MissingDependencyException + + +try: + from dlt.common.libs.pandas import pandas + from dlt.common.libs.pyarrow import pyarrow + + PandaFrame, ArrowTable, ArrowRecords = pandas.DataFrame, pyarrow.Table, pyarrow.RecordBatch +except MissingDependencyException: + PandaFrame, ArrowTable, ArrowRecords = NoneType, NoneType, NoneType + + +def wrap_additional_type(data: Any) -> Any: + """Wraps any known additional type so it is accepted by DltResource""" + # pass through None: if optional deps are not defined, they fallback to None type + if data is None: + return data + + if isinstance(data, (PandaFrame, ArrowTable, ArrowRecords)): + return [data] + + return data \ No newline at end of file diff --git a/dlt/helpers/pandas_helper.py b/dlt/helpers/pandas_helper.py deleted file mode 100644 index 9c077d49a9..0000000000 --- a/dlt/helpers/pandas_helper.py +++ /dev/null @@ -1,56 +0,0 @@ -from typing import Any - -from deprecated import deprecated - -from dlt.common.exceptions import MissingDependencyException -from dlt.destinations.sql_client import SqlClientBase - -try: - import pandas as pd - from pandas.io.sql import _wrap_result -except ModuleNotFoundError: - raise MissingDependencyException("DLT Pandas Helpers", ["pandas"]) - - -@deprecated(reason="Use `df` method on cursor returned from client.execute_query") -def query_results_to_df( - client: SqlClientBase[Any], query: str, index_col: Any = None, coerce_float: bool = True, parse_dates: Any = None, dtype: Any = None -) -> pd.DataFrame: - """ - A helper function that executes a query in the destination and returns the result as Pandas `DataFrame` - - This method reuses `read_sql` method of `Pandas` with the sql client obtained from `Pipeline.sql_client` method. - - Parameters - ---------- - client (SqlClientBase[Any]): Sql Client instance - query (str): Query to be executed - index_col str or list of str, optional, default: None - Column(s) to set as index(MultiIndex). - coerce_float (bool, optional): default: True - Attempts to convert values of non-string, non-numeric objects (like - decimal.Decimal) to floating point. Useful for SQL result sets. - parse_dates : list or dict, default: None - - List of column names to parse as dates. - - Dict of ``{column_name: format string}`` where format string is - strftime compatible in case of parsing string times, or is one of - (D, s, ns, ms, us) in case of parsing integer timestamps. - - Dict of ``{column_name: arg dict}``, where the arg dict corresponds - to the keyword arguments of :func:`pandas.to_datetime` - Especially useful with databases without native Datetime support, - such as SQLite. - dtype : Type name or dict of columns - Data type for data or columns. E.g. np.float64 or - {‘a’: np.float64, ‘b’: np.int32, ‘c’: ‘Int64’}. - - Returns - ------- - DataFrame with the query results - """ - with client.execute_query(query) as curr: - # get column names - columns = [c[0] for c in curr.description] - # use existing panda function that converts results to data frame - # TODO: we may use `_wrap_iterator` to prevent loading the full result to memory first - pf: pd.DataFrame = _wrap_result(curr.fetchall(), columns, index_col, coerce_float, parse_dates, dtype) - return pf diff --git a/dlt/helpers/streamlit_helper.py b/dlt/helpers/streamlit_helper.py index a8881563fb..082c1242da 100644 --- a/dlt/helpers/streamlit_helper.py +++ b/dlt/helpers/streamlit_helper.py @@ -9,7 +9,7 @@ from dlt.common.exceptions import MissingDependencyException from dlt.common.destination.reference import WithStateSync -from dlt.helpers.pandas_helper import pd +from dlt.common.libs.pandas import pandas as pd from dlt.pipeline import Pipeline from dlt.pipeline.exceptions import CannotRestorePipelineException, SqlClientNotAvailable from dlt.pipeline.state_sync import load_state_from_destination diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md b/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md index 795cb193a9..c8c4524ac2 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md @@ -34,7 +34,7 @@ df = pd.DataFrame({ pipeline = dlt.pipeline("orders_pipeline", destination="snowflake") -pipeline.run([df], table_name="orders") +pipeline.run(df, table_name="orders") ``` A `pyarrow` table can be loaded in the same way: @@ -46,7 +46,7 @@ import pyarrow as pa ... table = pa.Table.from_pandas(df) -pipeline.run([table], table_name="orders") +pipeline.run(table, table_name="orders") ``` Note: The data in the table must be compatible with the destination database as no data conversion is performed. Refer to the documentation of the destination for information about supported data types. @@ -131,3 +131,18 @@ The Arrow data types are translated to dlt data types as follows: | `decimal` | `decimal` | Precision and scale are determined by the type properties. | | `struct` | `complex` | | | | | | + + +## Loading nested types +All struct types are represented as `complex` and will be loaded as JSON (if destination permits) or a string. Currently we do not support **struct** types, +even if they are present in the destination. + +If you want to represent nested data as separated tables, you must yield panda frames and arrow tables as records. In the examples above: +```python +# yield panda frame as records +pipeline.run(df.to_dict(orient='records'), table_name="orders") + +# yield arrow table +pipeline.run(table.to_pylist(), table_name="orders") +``` +Both Pandas and Arrow allow to stream records in batches. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 6789f975be..3795f0096b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dlt" -version = "0.3.23a1" +version = "0.3.23" description = "dlt is an open-source python-first scalable data loading library that does not require any backend to run." authors = ["dltHub Inc. "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 2b6d26ba12..9d5b37f472 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -205,7 +205,7 @@ def some_data(created_at=dlt.sources.incremental('data.items[0].created_at')): ex: PipelineStepFailed = py_ex.value assert isinstance(ex.exception, IncrementalCursorPathMissing) - assert "Column name data.items.[0].created_at was not found in the arrow table" in str(ex) + assert ex.exception.json_path == "data.items[0].created_at" @pytest.mark.parametrize("item_type", ALL_ITEM_FORMATS) @@ -508,7 +508,7 @@ def some_data(last_timestamp=dlt.sources.incremental("ts")): @pytest.mark.parametrize("item_type", ALL_ITEM_FORMATS) def test_missing_cursor_field(item_type: TItemFormat) -> None: - + os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately @dlt.resource def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): data = [{"delta": i, "ts": pendulum.now().add(days=i).timestamp()} for i in range(-10, 10)] @@ -516,9 +516,55 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): yield from source_items with pytest.raises(IncrementalCursorPathMissing) as py_ex: - list(some_data) + list(some_data()) assert py_ex.value.json_path == "item.timestamp" + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as pip_ex: + dlt.run(some_data(), destination="dummy") + assert isinstance(pip_ex.value.__context__, IncrementalCursorPathMissing) + assert pip_ex.value.__context__.json_path == "item.timestamp" + + +def test_json_path_cursor() -> None: + + @dlt.resource + def some_data(last_timestamp=dlt.sources.incremental("item.timestamp|modifiedAt")): + yield [{ + "delta": i, + "item": { + "timestamp": pendulum.now().add(days=i).timestamp() + } + } for i in range(-10, 10)] + + yield [{ + "delta": i, + "item": { + "modifiedAt": pendulum.now().add(days=i).timestamp() + } + } for i in range(-10, 10)] + + # path should match both timestamp and modifiedAt in item + list(some_data) + + +def test_remove_incremental_with_explicit_none() -> None: + + @dlt.resource + def some_data_optional(last_timestamp: Optional[dlt.sources.incremental[float]] = dlt.sources.incremental("item.timestamp")): + assert last_timestamp is None + yield 1 + # we disable incremental by typing the argument as optional + assert list(some_data_optional(last_timestamp=None)) == [1] + + @dlt.resource(standalone=True) + def some_data(last_timestamp: dlt.sources.incremental[float] = dlt.sources.incremental("item.timestamp")): + assert last_timestamp is None + yield 1 + # we'll get the value error + with pytest.raises(ValueError): + assert list(some_data(last_timestamp=None)) == [1] + @pytest.mark.parametrize("item_type", ALL_ITEM_FORMATS) def test_filter_processed_items(item_type: TItemFormat) -> None: diff --git a/tests/load/filesystem/test_aws_credentials.py b/tests/load/filesystem/test_aws_credentials.py index f75b5fb79c..d34bc7ed24 100644 --- a/tests/load/filesystem/test_aws_credentials.py +++ b/tests/load/filesystem/test_aws_credentials.py @@ -43,11 +43,12 @@ def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None: import botocore.session session = botocore.session.get_session() + region_name = 'eu-central-1' # session.get_config_variable('region') c = AwsCredentials(session) assert c.profile_name is None assert c.aws_access_key_id == "fake_access_key" - assert c.region_name == session.get_config_variable('region') + assert c.region_name == region_name assert c.profile_name is None assert c.is_resolved() assert not c.is_partial() @@ -60,7 +61,7 @@ def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None: "profile": None, "endpoint_url": None, "client_kwargs": { - "region_name": session.get_config_variable('region') + "region_name": region_name } } @@ -131,7 +132,7 @@ def test_aws_credentials_with_endpoint_url(environment: Dict[str, str]) -> None: "profile": None, "endpoint_url": "https://123.r2.cloudflarestorage.com", "client_kwargs": { - "region_name": environment['AWS_DEFAULT_REGION'] + "region_name": 'eu-central-1' } } @@ -140,4 +141,4 @@ def set_aws_credentials_env(environment: Dict[str, str]) -> None: environment['AWS_ACCESS_KEY_ID'] = 'fake_access_key' environment['AWS_SECRET_ACCESS_KEY'] = 'fake_secret_key' environment['AWS_SESSION_TOKEN'] = 'fake_session_token' - environment['AWS_DEFAULT_REGION'] = 'eu-central-1' + environment["AWS_DEFAULT_REGION"] = environment['REGION_NAME'] = 'eu-central-1' diff --git a/tests/pipeline/test_arrow_sources.py b/tests/pipeline/test_arrow_sources.py index ffe72c0f66..31d5d001df 100644 --- a/tests/pipeline/test_arrow_sources.py +++ b/tests/pipeline/test_arrow_sources.py @@ -181,6 +181,24 @@ def data_frames(): assert len(pipeline.get_load_package_info(load_id).jobs["new_jobs"]) == 10 +@pytest.mark.parametrize("item_type", ["pandas", "table", "record_batch"]) +def test_arrow_as_data_loading(item_type: TArrowFormat) -> None: + os.environ["RESTORE_FROM_DESTINATION"] = "False" + os.environ["DESTINATION__LOADER_FILE_FORMAT"] = "parquet" + + item, rows = arrow_table_all_data_types(item_type) + + item_resource = dlt.resource(item, name="item") + assert id(item) == id(list(item_resource)[0]) + + pipeline_name = "arrow_" + uniq_id() + pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + pipeline.extract(item, table_name="items") + assert len(pipeline.list_extracted_resources()) == 1 + info = pipeline.normalize() + assert info.row_counts["items"] == len(rows) + + @pytest.mark.parametrize("item_type", ["table", "pandas", "record_batch"]) def test_normalize_with_dlt_columns(item_type: TArrowFormat): item, records = arrow_table_all_data_types(item_type, num_rows=5432)