diff --git a/.github/workflows/test_doc_snippets.yml b/.github/workflows/test_doc_snippets.yml index b2a2f241db..e158c2d669 100644 --- a/.github/workflows/test_doc_snippets.yml +++ b/.github/workflows/test_doc_snippets.yml @@ -56,7 +56,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E duckdb -E weaviate --with docs --without airflow + run: poetry install --no-interaction -E duckdb -E weaviate -E parquet --with docs --without airflow - name: Run linter and tests run: make test-and-lint-snippets diff --git a/dlt/common/configuration/exceptions.py b/dlt/common/configuration/exceptions.py index cae666dab1..f019565013 100644 --- a/dlt/common/configuration/exceptions.py +++ b/dlt/common/configuration/exceptions.py @@ -1,6 +1,8 @@ +import os from typing import Any, Mapping, Type, Tuple, NamedTuple, Sequence from dlt.common.exceptions import DltException, TerminalException +from dlt.common.utils import main_module_file_path class LookupTrace(NamedTuple): @@ -48,6 +50,14 @@ def __str__(self) -> str: msg += f'\tfor field "{f}" config providers and keys were tried in following order:\n' for tr in field_traces: msg += f'\t\tIn {tr.provider} key {tr.key} was not found.\n' + # check if entry point is run with path. this is common problem so warn the user + main_path = main_module_file_path() + main_dir = os.path.dirname(main_path) + abs_main_dir = os.path.abspath(main_dir) + if abs_main_dir != os.getcwd(): + # directory was specified + msg += "WARNING: dlt looks for .dlt folder in your current working directory and your cwd (%s) is different from directory of your pipeline script (%s).\n" % (os.getcwd(), abs_main_dir) + msg += "If you keep your secret files in the same folder as your pipeline script but run your script from some other folder, secrets/configs will not be found\n" msg += "Please refer to https://dlthub.com/docs/general-usage/credentials for more information\n" return msg diff --git a/dlt/common/configuration/specs/aws_credentials.py b/dlt/common/configuration/specs/aws_credentials.py index 6ba661ae88..8c4aabc4ee 100644 --- a/dlt/common/configuration/specs/aws_credentials.py +++ b/dlt/common/configuration/specs/aws_credentials.py @@ -1,7 +1,7 @@ from typing import Optional, Dict, Any from dlt.common.exceptions import MissingDependencyException -from dlt.common.typing import TSecretStrValue +from dlt.common.typing import TSecretStrValue, DictStrAny from dlt.common.configuration.specs import CredentialsConfiguration, CredentialsWithDefault, configspec from dlt.common.configuration.specs.exceptions import InvalidBoto3Session from dlt import version @@ -19,13 +19,16 @@ class AwsCredentialsWithoutDefaults(CredentialsConfiguration): def to_s3fs_credentials(self) -> Dict[str, Optional[str]]: """Dict of keyword arguments that can be passed to s3fs""" - return dict( + credentials: DictStrAny = dict( key=self.aws_access_key_id, secret=self.aws_secret_access_key, token=self.aws_session_token, profile=self.profile_name, endpoint_url=self.endpoint_url, ) + if self.region_name: + credentials["client_kwargs"] = {"region_name": self.region_name} + return credentials def to_native_representation(self) -> Dict[str, Optional[str]]: """Return a dict that can be passed as kwargs to boto3 session""" diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index 98f1b51bee..5c93e22bc6 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -1,4 +1,5 @@ import gzip +from functools import reduce from typing import List, IO, Any, Optional, Type, TypeVar, Generic from dlt.common.utils import uniq_id @@ -58,6 +59,7 @@ def __init__( self._current_columns: TTableSchemaColumns = None self._file_name: str = None self._buffered_items: List[TDataItem] = [] + self._buffered_items_count: int = 0 self._writer: TWriter = None self._file: IO[Any] = None self._closed = False @@ -79,10 +81,20 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non if isinstance(item, List): # items coming in single list will be written together, not matter how many are there self._buffered_items.extend(item) + # update row count, if item supports "num_rows" it will be used to count items + if len(item) > 0 and hasattr(item[0], "num_rows"): + self._buffered_items_count += sum(tbl.num_rows for tbl in item) + else: + self._buffered_items_count += len(item) else: self._buffered_items.append(item) + # update row count, if item supports "num_rows" it will be used to count items + if hasattr(item, "num_rows"): + self._buffered_items_count += item.num_rows + else: + self._buffered_items_count += 1 # flush if max buffer exceeded - if len(self._buffered_items) >= self.buffer_max_items: + if self._buffered_items_count >= self.buffer_max_items: self._flush_items() # rotate the file if max_bytes exceeded if self._file: @@ -118,7 +130,7 @@ def _rotate_file(self) -> None: self._file_name = self.file_name_template % uniq_id(5) + "." + self._file_format_spec.file_extension def _flush_items(self, allow_empty_file: bool = False) -> None: - if len(self._buffered_items) > 0 or allow_empty_file: + if self._buffered_items_count > 0 or allow_empty_file: # we only open a writer when there are any items in the buffer and first flush is requested if not self._writer: # create new writer and write header @@ -131,7 +143,9 @@ def _flush_items(self, allow_empty_file: bool = False) -> None: # write buffer if self._buffered_items: self._writer.write_data(self._buffered_items) + # reset buffer and counter self._buffered_items.clear() + self._buffered_items_count = 0 def _flush_and_close_file(self) -> None: # if any buffered items exist, flush them diff --git a/dlt/common/data_writers/writers.py b/dlt/common/data_writers/writers.py index 73e13ec46f..daf1dbbe17 100644 --- a/dlt/common/data_writers/writers.py +++ b/dlt/common/data_writers/writers.py @@ -274,6 +274,8 @@ def write_data(self, rows: Sequence[Any]) -> None: self.writer.write_batch(row) else: raise ValueError(f"Unsupported type {type(row)}") + # count rows that got written + self.items_count += row.num_rows @classmethod def data_format(cls) -> TFileFormatSpec: diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index aef2032261..77a5ae8e8e 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -393,6 +393,18 @@ def update_normalizers(self) -> None: normalizers["json"] = normalizers["json"] or self._normalizers_config["json"] self._configure_normalizers(normalizers) + def add_type_detection(self, detection: TTypeDetections) -> None: + """Add type auto detection to the schema.""" + if detection not in self.settings["detections"]: + self.settings["detections"].append(detection) + self._compile_settings() + + def remove_type_detection(self, detection: TTypeDetections) -> None: + """Adds type auto detection to the schema.""" + if detection in self.settings["detections"]: + self.settings["detections"].remove(detection) + self._compile_settings() + def _infer_column(self, k: str, v: Any, data_type: TDataType = None, is_variant: bool = False) -> TColumnSchema: column_schema = TColumnSchema( name=k, diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 32bd4ade1c..f2075ce85d 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -698,4 +698,4 @@ def standard_hints() -> Dict[TColumnHint, List[TSimpleRegex]]: def standard_type_detections() -> List[TTypeDetections]: - return ["timestamp", "iso_timestamp"] + return ["iso_timestamp"] diff --git a/dlt/destinations/filesystem/filesystem.py b/dlt/destinations/filesystem/filesystem.py index 49ad36dd16..766f384024 100644 --- a/dlt/destinations/filesystem/filesystem.py +++ b/dlt/destinations/filesystem/filesystem.py @@ -131,7 +131,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: for search_prefix in truncate_prefixes: if item.startswith(search_prefix): # NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors - logger.info(f"DEL {item}") + # logger.info(f"DEL {item}") # print(f"DEL {item}") self.fs_client.rm(item) except FileNotFoundError: diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index ec3f2bb47b..eb8a08e3d1 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -384,7 +384,7 @@ def decorator(f: Callable[TResourceFunParams, Any]) -> Callable[TResourceFunPara spec=spec, sections=resource_sections, sections_merge_style=ConfigSectionContext.resource_merge_style, include_defaults=spec is not None ) is_inner_resource = is_inner_callable(f) - if conf_f != incr_f and is_inner_resource: + if conf_f != incr_f and is_inner_resource and not standalone: raise ResourceInnerCallableConfigWrapDisallowed(resource_name, source_section) # get spec for wrapped function SPEC = get_fun_spec(conf_f) @@ -494,7 +494,7 @@ def transformer( selected: bool = True, spec: Type[BaseConfiguration] = None, standalone: Literal[True] = True -) -> Callable[..., DltResource]: # TODO: change back to Callable[TResourceFunParams, DltResource] when mypy 1.6 is fixed +) -> Callable[TResourceFunParams, DltResource]: # TODO: change back to Callable[TResourceFunParams, DltResource] when mypy 1.6 is fixed ... def transformer( diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 7f8142e807..fdca26fcfe 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -164,14 +164,14 @@ def write_empty_file(self, table_name: str) -> None: table_name = self.schema.naming.normalize_table_identifier(table_name) self.storage.write_empty_file(self.extract_id, self.schema.name, table_name, None) - def _write_item(self, table_name: str, resource_name: str, items: TDataItems) -> None: + def _write_item(self, table_name: str, resource_name: str, items: TDataItems, columns: TTableSchemaColumns = None) -> None: # normalize table name before writing so the name match the name in schema # note: normalize function should be cached so there's almost no penalty on frequent calling # note: column schema is not required for jsonl writer used here table_name = self.schema.naming.normalize_identifier(table_name) self.collector.update(table_name) self.resources_with_items.add(resource_name) - self.storage.write_data_item(self.extract_id, self.schema.name, table_name, items, None) + self.storage.write_data_item(self.extract_id, self.schema.name, table_name, items, columns) def _write_dynamic_table(self, resource: DltResource, item: TDataItem) -> None: table_name = resource._table_name_hint_fun(item) @@ -212,6 +212,9 @@ def write_table(self, resource: DltResource, items: TDataItems, meta: Any) -> No ] super().write_table(resource, items, meta) + def _write_item(self, table_name: str, resource_name: str, items: TDataItems, columns: TTableSchemaColumns = None) -> None: + super()._write_item(table_name, resource_name, items, self.dynamic_tables[table_name][0]["columns"]) + def _write_static_table(self, resource: DltResource, table_name: str, items: TDataItems) -> None: existing_table = self.dynamic_tables.get(table_name) if existing_table is not None: diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index c77fb97b9f..b1dffe6b28 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -1,5 +1,5 @@ -from datetime import datetime # noqa: I251 -from typing import Optional, Tuple, Protocol, Mapping, Union, List +from datetime import datetime, date # noqa: I251 +from typing import Optional, Tuple, List try: import pandas as pd @@ -137,8 +137,9 @@ def __call__( return row, start_out_of_range, end_out_of_range - class ArrowIncremental(IncrementalTransformer): + _dlt_index = "_dlt_index" + def unique_values( self, item: "TAnyArrowItem", @@ -148,28 +149,34 @@ def unique_values( if not unique_columns: return [] item = item - indices = item["_dlt_index"].to_pylist() + indices = item[self._dlt_index].to_pylist() rows = item.select(unique_columns).to_pylist() return [ (index, digest128(json.dumps(row, sort_keys=True))) for index, row in zip(indices, rows) ] - def _deduplicate(self, tbl: "pa.Table", unique_columns: Optional[List[str]], aggregate: str, cursor_path: str) -> "pa.Table": - if unique_columns is None: - return tbl - group_cols = unique_columns + [cursor_path] - tbl = tbl.append_column("_dlt_index", pa.array(np.arange(tbl.num_rows))) - try: - tbl = tbl.filter( - pa.compute.is_in( - tbl['_dlt_index'], - tbl.group_by(group_cols).aggregate( - [("_dlt_index", "one"), (cursor_path, aggregate)] - )['_dlt_index_one'] - ) - ) - except KeyError as e: - raise IncrementalPrimaryKeyMissing(self.resource_name, unique_columns[0], tbl) from e + def _deduplicate(self, tbl: "pa.Table", unique_columns: Optional[List[str]], aggregate: str, cursor_path: str) -> "pa.Table": + """Creates unique index if necessary.""" + # create unique index if necessary + if self._dlt_index not in tbl.schema.names: + tbl = tbl.append_column(self._dlt_index, pa.array(np.arange(tbl.num_rows))) + # code below deduplicates groups that include the cursor column in the group id. that was just artifact of + # json incremental and there's no need to duplicate it here + + # if unique_columns is None: + # return tbl + # group_cols = unique_columns + [cursor_path] + # try: + # tbl = tbl.filter( + # pa.compute.is_in( + # tbl[self._dlt_index], + # tbl.group_by(group_cols).aggregate( + # [(self._dlt_index, "one"), (cursor_path, aggregate)] + # )[f'{self._dlt_index}_one'] + # ) + # ) + # except KeyError as e: + # raise IncrementalPrimaryKeyMissing(self.resource_name, unique_columns[0], tbl) from e return tbl def __call__( @@ -180,6 +187,25 @@ def __call__( if is_pandas: tbl = pa.Table.from_pandas(tbl) + primary_key = self.primary_key(tbl) if callable(self.primary_key) else self.primary_key + if primary_key: + # create a list of unique columns + if isinstance(primary_key, str): + unique_columns = [primary_key] + else: + unique_columns = list(primary_key) + # check if primary key components are in the table + for pk in unique_columns: + if pk not in tbl.schema.names: + raise IncrementalPrimaryKeyMissing(self.resource_name, pk, tbl) + # use primary key as unique index + if isinstance(primary_key, str): + self._dlt_index = primary_key + elif primary_key is None: + unique_columns = tbl.column_names + else: # deduplicating is disabled + unique_columns = None + start_out_of_range = end_out_of_range = False if not tbl: # row is None or empty arrow table return tbl, start_out_of_range, end_out_of_range @@ -206,24 +232,19 @@ def __call__( cursor_path = str(self.cursor_path) # The new max/min value try: - row_value = compute(tbl[cursor_path]).as_py() + orig_row_value = compute(tbl[cursor_path]) + row_value = orig_row_value.as_py() + # dates are not represented as datetimes but I see connector-x represents + # datetimes as dates and keeping the exact time inside. probably a bug + # but can be corrected this way + if isinstance(row_value, date) and not isinstance(row_value, datetime): + row_value = pendulum.from_timestamp(orig_row_value.cast(pa.int64()).as_py() / 1000) except KeyError as e: raise IncrementalCursorPathMissing( self.resource_name, cursor_path, tbl, f"Column name {str(cursor_path)} was not found in the arrow table. Note nested JSON paths are not supported for arrow tables and dataframes, the incremental cursor_path must be a column name." ) from e - primary_key = self.primary_key(tbl) if callable(self.primary_key) else self.primary_key - if primary_key: - if isinstance(primary_key, str): - unique_columns = [primary_key] - else: - unique_columns = list(primary_key) - elif primary_key is None: - unique_columns = tbl.column_names - else: # deduplicating is disabled - unique_columns = None - # If end_value is provided, filter to include table rows that are "less" than end_value if self.end_value is not None: tbl = tbl.filter(end_compare(tbl[cursor_path], self.end_value)) @@ -247,7 +268,7 @@ def __call__( unique_values = [(i, uq_val) for i, uq_val in unique_values if uq_val in self.incremental_state['unique_hashes']] remove_idx = pa.array(i for i, _ in unique_values) # Filter the table - tbl = tbl.filter(pa.compute.invert(pa.compute.is_in(tbl["_dlt_index"], remove_idx))) + tbl = tbl.filter(pa.compute.invert(pa.compute.is_in(tbl[self._dlt_index], remove_idx))) if new_value_compare(row_value, last_value).as_py() and row_value != last_value: # Last value has changed self.incremental_state['last_value'] = row_value diff --git a/tests/common/test_data_writers/__init__.py b/docs/examples/connector_x_arrow/__init__.py similarity index 100% rename from tests/common/test_data_writers/__init__.py rename to docs/examples/connector_x_arrow/__init__.py diff --git a/docs/website/blog/2023-10-23-arrow-loading.md b/docs/website/blog/2023-10-23-arrow-loading.md new file mode 100644 index 0000000000..80e6744df5 --- /dev/null +++ b/docs/website/blog/2023-10-23-arrow-loading.md @@ -0,0 +1,141 @@ +--- +slug: dlt-arrow-loading +title: "Get 30x speedups when reading databases with ConnectorX + Arrow + dlt" +image: /img/arrow_30x_faster.png +authors: + name: Marcin Rudolf + title: dltHub CTO + url: https://github.com/rudolfix + image_url: https://avatars.githubusercontent.com/u/17202864 +tags: [arrow, Rust, ConnectorX] +--- + +If rust + arrow + duckb is a new data engineering stack, now you can get a feel of it with `dlt`. We recently added native arrow tables (and panda frames) loading. What it means? You can pass an Arrow table to `dlt` **pipeline.run** or **pipeline.extract** methods, have it normalized, saved to parquet and loaded to your destination. + +Here we achieved ~30x speedups when loading data from (local) postgres database using ConnectorX + Arrow compared to SqlAlchemy + json. (both use dlt as an engine, read disclaimer at the end!) + +### Load postgres table with Arrow + +We’ll start with [ConnectorX library](https://github.com/sfu-db/connector-x) that creates Arrow tables from SQL queries on most of the popular database engines. + +```python +pip install connectorx +``` + +Lib has Rust inside, zero copy extraction and is amazingly fast. We’ll extract and normalize 10 000 000 [test rows](https://github.com/dlt-hub/verified-sources/blob/master/tests/sql_database/sql_source.py#L88) from local postgresql. The table **chat_message** looks like Slack messages dump. Messages have unique autoincrement **id** which we use to load in chunks: + +```python +import connectorx as cx +import dlt +from dlt.sources.credentials import ConnectionStringCredentials + +def read_sql_x( + conn_str: str +): + # load in chunks by one million + for _id in range(1, 10_000_001, 1_000_000): + table = cx.read_sql(conn_str, + "SELECT * FROM arrow_test_2.chat_message WHERE id BETWEEN %i AND %i" % (_id, _id + 1000000 - 1), + return_type="arrow2", + protocol="binary" + ) + yield table + +chat_messages = dlt.resource( + read_sql_x, + name="chat_messages" +)("postgresql://loader:loader@localhost:5432/dlt_data") +``` + +In this demo I just extract and normalize data and skip the loading step. + +```python +pipeline = dlt.pipeline(destination="duckdb", full_refresh=True) +# extract first +pipeline.extract(chat_messages) +info = pipeline.normalize() +# print count of items normalized +print(info) +# print the execution trace +print(pipeline.last_trace) +``` + +Let’s run it: + +```sh +$ PROGRESS=enlighten python connector_x_speed.py +Items 10000001 [00:00, 241940483.70/s] +Normalized data for the following tables: +- _dlt_pipeline_state: 1 row(s) +- chat_messages: 10000000 row(s) + +Run started at 2023-10-23T19:06:55.527176+00:00 and COMPLETED in 16.17 seconds with 2 steps. +Step extract COMPLETED in 16.09 seconds. + +Step normalize COMPLETED in 0.08 seconds. +``` +### Load postgres table with SqlAlchemy + +Here’s corresponding code working with **SqlAlchemy**. We process 10 000 000 rows, yielding in 100k rows packs and normalize to parquet in 3 parallel processes. + +```python +from itertools import islice +import dlt +from sqlalchemy import create_engine + +CHUNK_SIZE=100000 + +def read_sql_a(conn_str: str): + engine = create_engine(conn_str) + with engine.connect() as conn: + rows = conn.execution_options(yield_per=CHUNK_SIZE).exec_driver_sql("SELECT * FROM arrow_test_2.chat_message") + while rows_slice := list(islice(map(lambda row: dict(row._mapping), rows), CHUNK_SIZE)): + yield rows_slice + +chat_messages = dlt.resource( + read_sql_a, + name="chat_messages", + write_disposition="append", +)("postgresql://loader:loader@localhost:5432/dlt_data") + +pipeline = dlt.pipeline(destination="duckdb", full_refresh=True) +# extract first +pipeline.extract(chat_messages) +info = pipeline.normalize(workers=3, loader_file_format="parquet") +print(info) +print(pipeline.last_trace) +``` + +Let’s run it: + +```sh +$ PROGRESS=enlighten python sql_alchemy_speed.py +Normalized data for the following tables: +- _dlt_pipeline_state: 1 row(s) +- chat_messages: 10000000 row(s) + +Run started at 2023-10-23T19:13:55.898598+00:00 and COMPLETED in 8 minutes and 12.97 seconds with 2 steps. +Step extract COMPLETED in 3 minutes and 32.75 seconds. + +Step normalize COMPLETED in 3 minutes and 40.22 seconds. +Normalized data for the following tables: +- _dlt_pipeline_state: 1 row(s) +- chat_messages: 10000000 row(s) +``` + +### Results + +So we can see **~30x overall speedup on extract and normalize steps** (~16 seconds vs ~8 minutes). The **extract step is ~13x faster**, while **normalize is few thousand times faster**. Arrow normalizer is just checking the schemas and moves parquet files around. JSON normalizer is inspecting every row to first infer the schema and then to validate the data. + +As the output in both of methods is the same (parquet files) - the actual load step takes the same time in both cases and is not compared. I could easily push the load packages (parquet files) to [any of supported destinations](https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas#destinations-that-support-parquet-for-direct-loading) + +### What’s next: +- [Reads our docs on Arrow](https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas) +- [Add merge and incremental loads to code above](https://dlthub.com/docs/examples/connector_x_arrow/) +- I'm on [dltHub Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) all the time. + +### Disclaimers + +- Playing field is not level. classical (sql alchemy) `dlt` run is processing data row by row, inferring and validating schema. that’s why it so slow. The Arrow version benefits from the fact, that data is already structured in the source. +- We load from local database. That means that network roundtrip during extraction is not included. That isolates Arrow speedups well. In case of remote database engine, the speedups will be smaller. +- You could optimize extract (both classical and arrow) by reading data from **postgres** [in parallel](https://dlthub.com/docs/examples/transformers/#using-transformers-with-the-pokemon-api) or use partitions in ConnectorX \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md b/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md index 5840fb3049..85796b0333 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md @@ -85,6 +85,10 @@ pipeline.run(orders) pipeline.run(orders) ``` +:::tip +Look at the [Connector X + Arrow Example](../../examples/connector_x_arrow/) to see how to load data from production databases fast. +::: + ## Supported Arrow data types The Arrow data types are translated to dlt data types as follows: diff --git a/docs/website/docs/examples/connector_x_arrow/__init__.py b/docs/website/docs/examples/connector_x_arrow/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/website/docs/examples/connector_x_arrow/code/__init__.py b/docs/website/docs/examples/connector_x_arrow/code/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py b/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py new file mode 100644 index 0000000000..488cb8a7cb --- /dev/null +++ b/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py @@ -0,0 +1,41 @@ +def connector_x_snippet() -> None: + + # @@@DLT_SNIPPET_START markdown_source + + import connectorx as cx + + import dlt + from dlt.sources.credentials import ConnectionStringCredentials + + def read_sql_x( + conn_str: ConnectionStringCredentials = dlt.secrets.value, + query: str = dlt.config.value + ): + yield cx.read_sql(conn_str.to_native_representation(), query, return_type="arrow2", protocol="binary") + + # create genome resource with merge on `upid` primary key + genome = dlt.resource( + name="genome", + write_disposition="merge", + primary_key="upid", + standalone=True + )(read_sql_x)( + "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] + "SELECT * FROM genome ORDER BY created LIMIT 1000" + ) + # add incremental on created at + genome.apply_hints(incremental=dlt.sources.incremental("created")) + # @@@DLT_SNIPPET_END markdown_source + # @@@DLT_SNIPPET_START markdown_pipeline + __name__ = "__main__" # @@@DLT_REMOVE + if __name__ == "__main__": + pipeline = dlt.pipeline(destination="duckdb") + print(pipeline.run(genome)) + print(pipeline.last_trace.last_normalize_info) + # NOTE: run pipeline again to see that no more records got loaded thanks to incremental working + # @@@DLT_SNIPPET_END markdown_pipeline + + # check that stuff was loaded + row_counts = pipeline.last_trace.last_normalize_info.row_counts + assert row_counts["genome"] == 1000 + diff --git a/docs/website/docs/examples/connector_x_arrow/index.md b/docs/website/docs/examples/connector_x_arrow/index.md new file mode 100644 index 0000000000..3d978deaaa --- /dev/null +++ b/docs/website/docs/examples/connector_x_arrow/index.md @@ -0,0 +1,75 @@ +--- +title: Load mysql table with ConnectorX & Arrow +description: Load data from sql queries fast with connector x and arrow tables +keywords: [connector x, pyarrow, zero copy] +--- + +import Header from '../_examples-header.md'; + +
+ +## Load mysql table with ConnectorX and Arrow + +Example script below takes genome data from public **mysql** instance and then loads it into **duckdb**. Mind that your destination +must support loading of parquet files as this is the format that `dlt` uses to save arrow tables. [Connector X](https://github.com/sfu-db/connector-x) allows to +get data from several popular databases and creates in memory Arrow table which `dlt` then saves to load package and loads to the destination. +:::tip +You can yield several tables if your data is large and you need to partition your load. +::: + +We'll learn: + +- How to get arrow tables from [connector X](https://github.com/sfu-db/connector-x) and yield them. +- That merge and incremental loads work with arrow tables. +- How to enable [incremental loading](../../general-usage/incremental-loading) for efficient data extraction. +- How to use build in ConnectionString credentials + + + +### Loading code + + +```py +import connectorx as cx + +import dlt +from dlt.sources.credentials import ConnectionStringCredentials + +def read_sql_x( + conn_str: ConnectionStringCredentials = dlt.secrets.value, + query: str = dlt.config.value +): + yield cx.read_sql(conn_str.to_native_representation(), query, return_type="arrow2", protocol="binary") + +# create genome resource with merge on `upid` primary key +genome = dlt.resource( + name="genome", + write_disposition="merge", + primary_key="upid", + standalone=True +)(read_sql_x)( + "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] + "SELECT * FROM genome ORDER BY created LIMIT 1000" +) +# add incremental on created at +genome.apply_hints(incremental=dlt.sources.incremental("created")) +``` + + +Run the pipeline: + + +```py +if __name__ == "__main__": + pipeline = dlt.pipeline(destination="duckdb") + print(pipeline.run(genome)) + print(pipeline.last_trace.last_normalize_info) + # NOTE: run pipeline again to see that no more records got loaded thanks to incremental working +``` + diff --git a/docs/website/docs/general-usage/destination-tables.md b/docs/website/docs/general-usage/destination-tables.md index 8f95639f87..8e1f771e47 100644 --- a/docs/website/docs/general-usage/destination-tables.md +++ b/docs/website/docs/general-usage/destination-tables.md @@ -163,6 +163,11 @@ case the primary key or other unique columns are defined. During a pipeline run, dlt [normalizes both table and column names](schema.md#naming-convention) to ensure compatibility with the destination database's accepted format. All names from your source data will be transformed into snake_case and will only include alphanumeric characters. Please be aware that the names in the destination database may differ somewhat from those in your original input. +### Variant columns +If your data has inconsistent types, `dlt` will dispatch the data to several **variant columns**. For example, if you have a resource (ie json file) with a filed with name **answer** and your data contains boolean values, you will get get a column with name **answer** of type **BOOLEAN** in your destination. If for some reason, on next load you get integer value and string value in **answer**, the inconsistent data will go to **answer__v_bigint** and **answer__v_text** columns respectively. +The general naming rule for variant columns is `__v_` where `original_name` is the existing column name (with data type clash) and `type` is the name of data type stored in the variant. + + ## Load Packages and Load IDs Each execution of the pipeline generates one or more load packages. A load package typically contains data retrieved from diff --git a/docs/website/docs/general-usage/schema.md b/docs/website/docs/general-usage/schema.md index e27a87e803..ee73aea54e 100644 --- a/docs/website/docs/general-usage/schema.md +++ b/docs/website/docs/general-usage/schema.md @@ -243,12 +243,12 @@ data itself. The `dlt.source` decorator accepts a schema instance that you can create yourself and modify in whatever way you wish. The decorator also support a few typical use cases: -### 1. Schema created implicitly by decorator +### Schema created implicitly by decorator If no schema instance is passed, the decorator creates a schema with the name set to source name and all the settings to default. -### 2. Automatically load schema file stored with source python module +### Automatically load schema file stored with source python module If no schema instance is passed, and a file with a name `{source name}_schema.yml` exists in the same folder as the module with the decorated function, it will be automatically loaded and used as @@ -256,7 +256,7 @@ the schema. This should make easier to bundle a fully specified (or pre-configured) schema with a source. -### 3. Schema is modified in the source function body +### Schema is modified in the source function body What if you can configure your schema or add some tables only inside your schema function, when i.e. you have the source credentials and user settings available? You could for example add detailed @@ -264,7 +264,7 @@ schemas of all the database tables when someone requests a table data to be load is available only at the moment source function is called. Similarly to the `source_state()` and `resource_state()` , source and resource function has current -schema available via `dlt.current.source_schema`. +schema available via `dlt.current.source_schema()`. Example: @@ -273,9 +273,10 @@ Example: def textual(nesting_level: int): # get the source schema from the `current` context schema = dlt.current.source_schema() - # remove date detector and add type detector that forces all fields to strings - schema._settings["detections"].remove("iso_timestamp") - schema._settings["detections"].insert(0, "all_text") + # remove date detector + schema.remove_type_detection("iso_timestamp") + # convert UNIX timestamp (float, withing a year from NOW) into timestamp + schema.add_type_detection("timestamp") schema.compile_settings() return dlt.resource(...) diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index 20d3d9bb8d..8e9823cbd2 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -92,22 +92,22 @@ Below we set files to rotated after 100.000 items written or when the filesize e # extract and normalize stages [data_writer] file_max_items=100000 -max_file_size=1000000 +file_max_bytes=1000000 # only for the extract stage - for all sources [sources.data_writer] file_max_items=100000 -max_file_size=1000000 +file_max_bytes=1000000 # only for the extract stage of a source with name zendesk_support [sources.zendesk_support.data_writer] file_max_items=100000 -max_file_size=1000000 +file_max_bytes=1000000 # only for the normalize stage [normalize.data_writer] file_max_items=100000 -max_file_size=1000000 +file_max_bytes=1000000 ``` @@ -235,7 +235,7 @@ The **normalize** stage uses a process pool to create load package concurrently. ```toml [extract.data_writer] # force extract file rotation if size exceeds 1MiB -max_file_size=1000000 +file_max_bytes=1000000 [normalize] # use 3 worker processes to process 3 files in parallel @@ -260,7 +260,7 @@ As before, **if you have just a single table with millions of records you should ```toml [normalize.data_writer] # force normalize file rotation if it exceeds 1MiB -max_file_size=1000000 +file_max_bytes=1000000 [load] # have 50 concurrent load jobs diff --git a/docs/website/docs/reference/performance_snippets/toml-snippets.toml b/docs/website/docs/reference/performance_snippets/toml-snippets.toml index db33235eb9..d028e9b145 100644 --- a/docs/website/docs/reference/performance_snippets/toml-snippets.toml +++ b/docs/website/docs/reference/performance_snippets/toml-snippets.toml @@ -21,22 +21,22 @@ buffer_max_items=100 # extract and normalize stages [data_writer] file_max_items=100000 -max_file_size=1000000 +file_max_bytes=1000000 # only for the extract stage - for all sources [sources.data_writer] file_max_items=100000 -max_file_size=1000000 +file_max_bytes=1000000 # only for the extract stage of a source with name zendesk_support [sources.zendesk_support.data_writer] file_max_items=100000 -max_file_size=1000000 +file_max_bytes=1000000 # only for the normalize stage [normalize.data_writer] file_max_items=100000 -max_file_size=1000000 +file_max_bytes=1000000 # @@@DLT_SNIPPET_END file_size_toml @@ -73,7 +73,7 @@ max_parallel_items=10 # @@@DLT_SNIPPET_START normalize_workers_toml [extract.data_writer] # force extract file rotation if size exceeds 1MiB -max_file_size=1000000 +file_max_bytes=1000000 [normalize] # use 3 worker processes to process 3 files in parallel @@ -84,7 +84,7 @@ workers=3 # @@@DLT_SNIPPET_START normalize_workers_2_toml [normalize.data_writer] # force normalize file rotation if it exceeds 1MiB -max_file_size=1000000 +file_max_bytes=1000000 [load] # have 50 concurrent load jobs @@ -106,7 +106,7 @@ request_max_retry_delay = 30 # Cap exponential delay to 30 seconds next_item_mode="round_robin" [sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline -next_item_mode="fifo" +next_item_mode="fifo" # @@@DLT_SNIPPET_END item_mode_toml diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index befbfc0b2d..f88ccbdf1a 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -224,6 +224,7 @@ const sidebars = { items: [ 'examples/transformers/index', 'examples/incremental_loading/index', + 'examples/connector_x_arrow/index', ], }, { diff --git a/docs/website/static/img/arrow_30x_faster.png b/docs/website/static/img/arrow_30x_faster.png new file mode 100644 index 0000000000..f0c06ff5e1 Binary files /dev/null and b/docs/website/static/img/arrow_30x_faster.png differ diff --git a/mypy.ini b/mypy.ini index 58d617a11c..0d7ab84da9 100644 --- a/mypy.ini +++ b/mypy.ini @@ -99,5 +99,7 @@ ignore_missing_imports=true [mypy-utils.*] ignore_missing_imports=true +[mypy-connectorx] +ignore_missing_imports=true [mypy-s3fs.*] ignore_missing_imports=true \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 5c7b40d6ae..15c257b607 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1286,6 +1286,14 @@ python-versions = ">=3.6" [package.extras] testing = ["flake8", "pytest", "pytest-cov", "pytest-virtualenv", "pytest-xdist", "sphinx"] +[[package]] +name = "connectorx" +version = "0.3.1" +description = "" +category = "dev" +optional = false +python-versions = "*" + [[package]] name = "connexion" version = "2.14.1" @@ -3006,7 +3014,7 @@ python-versions = ">=3.7" [[package]] name = "mypy" -version = "1.6.0" +version = "1.6.1" description = "Optional static typing for Python" category = "dev" optional = false @@ -4853,7 +4861,7 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "1.1" python-versions = ">=3.8.1,<4.0" -content-hash = "518cafbab95729a8551e617a770ddd6487a0f4cbb4171c04f3fa7f5605e32328" +content-hash = "7d5b9bfb96bfd08e2b6843df885a3ff605abe603250db78e35350e18bc933a64" [metadata.files] about-time = [ @@ -5319,6 +5327,24 @@ configupdater = [ {file = "ConfigUpdater-3.1.1-py2.py3-none-any.whl", hash = "sha256:805986dbeba317886c7a8d348b2e34986dc9e3128cd3761ecc35decbd372b286"}, {file = "ConfigUpdater-3.1.1.tar.gz", hash = "sha256:46f0c74d73efa723776764b43c9739f68052495dd3d734319c1d0eb58511f15b"}, ] +connectorx = [ + {file = "connectorx-0.3.1-cp310-cp310-macosx_10_7_x86_64.whl", hash = "sha256:719750045e7c3b94c199271fbfe6aef47944768e711f27bcc606b498707e0054"}, + {file = "connectorx-0.3.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:aed31b08acebeb3ebbe53c0df846c686e7c27c4242bff3a75b72cf517d070257"}, + {file = "connectorx-0.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:71d2c2678339fb01f89469bbe22e66e75cabcf727a52ed72d576fef5744ebc58"}, + {file = "connectorx-0.3.1-cp310-none-win_amd64.whl", hash = "sha256:92e576ef9610b59f8e5456c12d22e5b0752d0207f586df82701987657909888b"}, + {file = "connectorx-0.3.1-cp37-cp37m-macosx_10_7_x86_64.whl", hash = "sha256:36c28cc59220998928e7b283eecf404e17e077dc3e525570096d0968b192cc64"}, + {file = "connectorx-0.3.1-cp37-cp37m-macosx_11_0_arm64.whl", hash = "sha256:c5173e7252f593c46787627a46561b0d949eb80ab23321e045bbf6bd5131945c"}, + {file = "connectorx-0.3.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3c8411631750d24c12e5e296720637909b8515d5faa3b5eaf7bb86c582d02667"}, + {file = "connectorx-0.3.1-cp37-none-win_amd64.whl", hash = "sha256:0674b6389f8f2ba62155ac2f718df18f76f9de5c50d9911a5fefe7485e1c598e"}, + {file = "connectorx-0.3.1-cp38-cp38-macosx_10_7_x86_64.whl", hash = "sha256:324c5075e8aa6698db8c877cb847f0d86172784db88ac0f3e6762aa9852330f3"}, + {file = "connectorx-0.3.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:027a3880629a7b33ae0c7a80ab4fa53286957a253af2dfe34f19adfea6b79b91"}, + {file = "connectorx-0.3.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a666b967958fcf9fc0444a7b3603483ee23a2fe39f0da3d545ff199f376f7e4b"}, + {file = "connectorx-0.3.1-cp38-none-win_amd64.whl", hash = "sha256:3c5dedfd75cf44898c17cc84a1dd0ab6ed0fa54de0461f2d6aa4bcb2c2b0dc1d"}, + {file = "connectorx-0.3.1-cp39-cp39-macosx_10_7_x86_64.whl", hash = "sha256:354c4126bcd7a9efbb8879feac92e1e7b0d0712f7e98665c392af663805491f8"}, + {file = "connectorx-0.3.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3011e1f9a27fd2a7b12c6a45bc29f6e7577a27418a3f607adaf54b301ff09068"}, + {file = "connectorx-0.3.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1efb6ed547acc5837c2211e3d65d22948019d1653e7b30e522a4a4bd6d25fa8"}, + {file = "connectorx-0.3.1-cp39-none-win_amd64.whl", hash = "sha256:001b473e600b6d25af83b32674f98dccf49705a59bd6df724b5ba9beb236a0e0"}, +] connexion = [ {file = "connexion-2.14.1-py2.py3-none-any.whl", hash = "sha256:f343717241b4c4802a694c38fee66fb1693c897fe4ea5a957fa9b3b07caf6394"}, {file = "connexion-2.14.1.tar.gz", hash = "sha256:99aa5781e70a7b94f8ffae8cf89f309d49cdb811bbd65a8e2f2546f3b19a01e6"}, @@ -6457,33 +6483,33 @@ multidict = [ {file = "multidict-6.0.4.tar.gz", hash = "sha256:3666906492efb76453c0e7b97f2cf459b0682e7402c0489a95484965dbc1da49"}, ] mypy = [ - {file = "mypy-1.6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:091f53ff88cb093dcc33c29eee522c087a438df65eb92acd371161c1f4380ff0"}, - {file = "mypy-1.6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:eb7ff4007865833c470a601498ba30462b7374342580e2346bf7884557e40531"}, - {file = "mypy-1.6.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:49499cf1e464f533fc45be54d20a6351a312f96ae7892d8e9f1708140e27ce41"}, - {file = "mypy-1.6.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:4c192445899c69f07874dabda7e931b0cc811ea055bf82c1ababf358b9b2a72c"}, - {file = "mypy-1.6.0-cp310-cp310-win_amd64.whl", hash = "sha256:3df87094028e52766b0a59a3e46481bb98b27986ed6ded6a6cc35ecc75bb9182"}, - {file = "mypy-1.6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:3c8835a07b8442da900db47ccfda76c92c69c3a575872a5b764332c4bacb5a0a"}, - {file = "mypy-1.6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:24f3de8b9e7021cd794ad9dfbf2e9fe3f069ff5e28cb57af6f873ffec1cb0425"}, - {file = "mypy-1.6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:856bad61ebc7d21dbc019b719e98303dc6256cec6dcc9ebb0b214b81d6901bd8"}, - {file = "mypy-1.6.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:89513ddfda06b5c8ebd64f026d20a61ef264e89125dc82633f3c34eeb50e7d60"}, - {file = "mypy-1.6.0-cp311-cp311-win_amd64.whl", hash = "sha256:9f8464ed410ada641c29f5de3e6716cbdd4f460b31cf755b2af52f2d5ea79ead"}, - {file = "mypy-1.6.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:971104bcb180e4fed0d7bd85504c9036346ab44b7416c75dd93b5c8c6bb7e28f"}, - {file = "mypy-1.6.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ab98b8f6fdf669711f3abe83a745f67f50e3cbaea3998b90e8608d2b459fd566"}, - {file = "mypy-1.6.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1a69db3018b87b3e6e9dd28970f983ea6c933800c9edf8c503c3135b3274d5ad"}, - {file = "mypy-1.6.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:dccd850a2e3863891871c9e16c54c742dba5470f5120ffed8152956e9e0a5e13"}, - {file = "mypy-1.6.0-cp312-cp312-win_amd64.whl", hash = "sha256:f8598307150b5722854f035d2e70a1ad9cc3c72d392c34fffd8c66d888c90f17"}, - {file = "mypy-1.6.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:fea451a3125bf0bfe716e5d7ad4b92033c471e4b5b3e154c67525539d14dc15a"}, - {file = "mypy-1.6.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e28d7b221898c401494f3b77db3bac78a03ad0a0fff29a950317d87885c655d2"}, - {file = "mypy-1.6.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e4b7a99275a61aa22256bab5839c35fe8a6887781862471df82afb4b445daae6"}, - {file = "mypy-1.6.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:7469545380dddce5719e3656b80bdfbb217cfe8dbb1438532d6abc754b828fed"}, - {file = "mypy-1.6.0-cp38-cp38-win_amd64.whl", hash = "sha256:7807a2a61e636af9ca247ba8494031fb060a0a744b9fee7de3a54bed8a753323"}, - {file = "mypy-1.6.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d2dad072e01764823d4b2f06bc7365bb1d4b6c2f38c4d42fade3c8d45b0b4b67"}, - {file = "mypy-1.6.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:b19006055dde8a5425baa5f3b57a19fa79df621606540493e5e893500148c72f"}, - {file = "mypy-1.6.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:31eba8a7a71f0071f55227a8057468b8d2eb5bf578c8502c7f01abaec8141b2f"}, - {file = "mypy-1.6.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8e0db37ac4ebb2fee7702767dfc1b773c7365731c22787cb99f507285014fcaf"}, - {file = "mypy-1.6.0-cp39-cp39-win_amd64.whl", hash = "sha256:c69051274762cccd13498b568ed2430f8d22baa4b179911ad0c1577d336ed849"}, - {file = "mypy-1.6.0-py3-none-any.whl", hash = "sha256:9e1589ca150a51d9d00bb839bfeca2f7a04f32cd62fad87a847bc0818e15d7dc"}, - {file = "mypy-1.6.0.tar.gz", hash = "sha256:4f3d27537abde1be6d5f2c96c29a454da333a2a271ae7d5bc7110e6d4b7beb3f"}, + {file = "mypy-1.6.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e5012e5cc2ac628177eaac0e83d622b2dd499e28253d4107a08ecc59ede3fc2c"}, + {file = "mypy-1.6.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d8fbb68711905f8912e5af474ca8b78d077447d8f3918997fecbf26943ff3cbb"}, + {file = "mypy-1.6.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21a1ad938fee7d2d96ca666c77b7c494c3c5bd88dff792220e1afbebb2925b5e"}, + {file = "mypy-1.6.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:b96ae2c1279d1065413965c607712006205a9ac541895004a1e0d4f281f2ff9f"}, + {file = "mypy-1.6.1-cp310-cp310-win_amd64.whl", hash = "sha256:40b1844d2e8b232ed92e50a4bd11c48d2daa351f9deee6c194b83bf03e418b0c"}, + {file = "mypy-1.6.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:81af8adaa5e3099469e7623436881eff6b3b06db5ef75e6f5b6d4871263547e5"}, + {file = "mypy-1.6.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:8c223fa57cb154c7eab5156856c231c3f5eace1e0bed9b32a24696b7ba3c3245"}, + {file = "mypy-1.6.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a8032e00ce71c3ceb93eeba63963b864bf635a18f6c0c12da6c13c450eedb183"}, + {file = "mypy-1.6.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4c46b51de523817a0045b150ed11b56f9fff55f12b9edd0f3ed35b15a2809de0"}, + {file = "mypy-1.6.1-cp311-cp311-win_amd64.whl", hash = "sha256:19f905bcfd9e167159b3d63ecd8cb5e696151c3e59a1742e79bc3bcb540c42c7"}, + {file = "mypy-1.6.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:82e469518d3e9a321912955cc702d418773a2fd1e91c651280a1bda10622f02f"}, + {file = "mypy-1.6.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:d4473c22cc296425bbbce7e9429588e76e05bc7342da359d6520b6427bf76660"}, + {file = "mypy-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:59a0d7d24dfb26729e0a068639a6ce3500e31d6655df8557156c51c1cb874ce7"}, + {file = "mypy-1.6.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:cfd13d47b29ed3bbaafaff7d8b21e90d827631afda134836962011acb5904b71"}, + {file = "mypy-1.6.1-cp312-cp312-win_amd64.whl", hash = "sha256:eb4f18589d196a4cbe5290b435d135dee96567e07c2b2d43b5c4621b6501531a"}, + {file = "mypy-1.6.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:41697773aa0bf53ff917aa077e2cde7aa50254f28750f9b88884acea38a16169"}, + {file = "mypy-1.6.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7274b0c57737bd3476d2229c6389b2ec9eefeb090bbaf77777e9d6b1b5a9d143"}, + {file = "mypy-1.6.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bbaf4662e498c8c2e352da5f5bca5ab29d378895fa2d980630656178bd607c46"}, + {file = "mypy-1.6.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:bb8ccb4724f7d8601938571bf3f24da0da791fe2db7be3d9e79849cb64e0ae85"}, + {file = "mypy-1.6.1-cp38-cp38-win_amd64.whl", hash = "sha256:68351911e85145f582b5aa6cd9ad666c8958bcae897a1bfda8f4940472463c45"}, + {file = "mypy-1.6.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:49ae115da099dcc0922a7a895c1eec82c1518109ea5c162ed50e3b3594c71208"}, + {file = "mypy-1.6.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8b27958f8c76bed8edaa63da0739d76e4e9ad4ed325c814f9b3851425582a3cd"}, + {file = "mypy-1.6.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:925cd6a3b7b55dfba252b7c4561892311c5358c6b5a601847015a1ad4eb7d332"}, + {file = "mypy-1.6.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8f57e6b6927a49550da3d122f0cb983d400f843a8a82e65b3b380d3d7259468f"}, + {file = "mypy-1.6.1-cp39-cp39-win_amd64.whl", hash = "sha256:a43ef1c8ddfdb9575691720b6352761f3f53d85f1b57d7745701041053deff30"}, + {file = "mypy-1.6.1-py3-none-any.whl", hash = "sha256:4cbe68ef919c28ea561165206a2dcb68591c50f3bcf777932323bc208d949cf1"}, + {file = "mypy-1.6.1.tar.gz", hash = "sha256:4d01c00d09a0be62a4ca3f933e315455bde83f37f892ba4b08ce92f3cf44bcc1"}, ] mypy-boto3-athena = [ {file = "mypy-boto3-athena-1.28.36.tar.gz", hash = "sha256:a76df6aace3dc1d91b3f74640d617cd1b4802e5f348a22db2f16dfce0b01ee26"}, diff --git a/pyproject.toml b/pyproject.toml index adf6ff9b3f..242aee2aac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,7 +114,7 @@ enlighten = "^1.11.2" alive-progress = "^3.1.1" pytest-console-scripts = "^1.4.1" pytest = "^6.2.4" -mypy = "^1.6.0" +mypy = "^1.6.1" flake8 = "^5.0.0" bandit = "^1.7.0" flake8-bugbear = "^22.0.0" @@ -159,6 +159,7 @@ optional = true pymysql = "^1.1.0" pypdf2 = "^3.0.1" pydoc-markdown = "^4.8.2" +connectorx="0.3.1" [build-system] requires = ["poetry-core>=1.0.8"] diff --git a/tests/common/data_writers/__init__.py b/tests/common/data_writers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/common/test_data_writers/test_buffered_writer.py b/tests/common/data_writers/test_buffered_writer.py similarity index 70% rename from tests/common/test_data_writers/test_buffered_writer.py rename to tests/common/data_writers/test_buffered_writer.py index 59a99a4ca6..85cfcb2d0c 100644 --- a/tests/common/test_data_writers/test_buffered_writer.py +++ b/tests/common/data_writers/test_buffered_writer.py @@ -1,8 +1,7 @@ import os -from typing import Iterator +from typing import Iterator, Set, Literal import pytest -from dlt.common.arithmetics import Decimal from dlt.common.data_writers.buffered import BufferedDataWriter, DataWriter from dlt.common.data_writers.exceptions import BufferedDataWriterClosed @@ -16,7 +15,10 @@ import datetime # noqa: 251 -def get_insert_writer(_format: TLoaderFileFormat = "insert_values", buffer_max_items: int = 10, disable_compression: bool = False) -> BufferedDataWriter[DataWriter]: +ALL_WRITERS: Set[Literal[TLoaderFileFormat]] = {"insert_values", "jsonl", "parquet", "arrow", "puae-jsonl"} + + +def get_writer(_format: TLoaderFileFormat = "insert_values", buffer_max_items: int = 10, disable_compression: bool = False) -> BufferedDataWriter[DataWriter]: caps = DestinationCapabilitiesContext.generic_capabilities() caps.preferred_loader_file_format = _format file_template = os.path.join(TEST_STORAGE_ROOT, f"{_format}.%s") @@ -24,7 +26,7 @@ def get_insert_writer(_format: TLoaderFileFormat = "insert_values", buffer_max_i def test_write_no_item() -> None: - with get_insert_writer() as writer: + with get_writer() as writer: pass assert writer.closed with pytest.raises(BufferedDataWriterClosed): @@ -54,7 +56,7 @@ def c3_doc(count: int) -> Iterator[DictStrAny]: return map(lambda x: {"col3": "col3_value"}, range(0, count)) # change schema before file first flush - with get_insert_writer(disable_compression=disable_compression) as writer: + with get_writer(disable_compression=disable_compression) as writer: writer.write_data_item(list(c1_doc(8)), t1) assert writer._current_columns == t1 # but different instance @@ -75,7 +77,7 @@ def c3_doc(count: int) -> Iterator[DictStrAny]: assert "1,0" in content[-1] # data would flush and schema change - with get_insert_writer() as writer: + with get_writer() as writer: writer.write_data_item(list(c1_doc(9)), t1) old_file = writer._file_name writer.write_data_item(list(c2_doc(1)), t2) # rotates here @@ -88,7 +90,7 @@ def c3_doc(count: int) -> Iterator[DictStrAny]: assert writer._buffered_items == [] # file would rotate and schema change - with get_insert_writer() as writer: + with get_writer() as writer: writer.file_max_items = 10 writer.write_data_item(list(c1_doc(9)), t1) old_file = writer._file_name @@ -102,7 +104,7 @@ def c3_doc(count: int) -> Iterator[DictStrAny]: assert writer._buffered_items == [] # schema change after flush rotates file - with get_insert_writer() as writer: + with get_writer() as writer: writer.write_data_item(list(c1_doc(11)), t1) writer.write_data_item(list(c2_doc(1)), t2) assert len(writer.closed_files) == 1 @@ -139,7 +141,7 @@ def c2_doc(count: int) -> Iterator[DictStrAny]: return map(lambda x: {"col1": x, "col2": x*2+1}, range(0, count)) # change schema before file first flush - with get_insert_writer(_format="jsonl", disable_compression=disable_compression) as writer: + with get_writer(_format="jsonl", disable_compression=disable_compression) as writer: writer.write_data_item(list(c1_doc(15)), t1) # flushed assert writer._file is not None @@ -158,19 +160,62 @@ def c2_doc(count: int) -> Iterator[DictStrAny]: def test_writer_requiring_schema(disable_compression: bool) -> None: # assertion on flushing with pytest.raises(AssertionError): - with get_insert_writer(disable_compression=disable_compression) as writer: + with get_writer(disable_compression=disable_compression) as writer: writer.write_data_item([{"col1": 1}], None) # just single schema is enough c1 = new_column("col1", "bigint") t1 = {"col1": c1} - with get_insert_writer(disable_compression=disable_compression) as writer: + with get_writer(disable_compression=disable_compression) as writer: writer.write_data_item([{"col1": 1}], None) writer.write_data_item([{"col1": 1}], t1) @pytest.mark.parametrize("disable_compression", [True, False], ids=["no_compression", "compression"]) def test_writer_optional_schema(disable_compression: bool) -> None: - with get_insert_writer(_format="jsonl", disable_compression=disable_compression) as writer: + with get_writer(_format="jsonl", disable_compression=disable_compression) as writer: writer.write_data_item([{"col1": 1}], None) writer.write_data_item([{"col1": 1}], None) + +@pytest.mark.parametrize("writer_format", ALL_WRITERS - {"arrow"}) +def test_writer_items_count(writer_format: TLoaderFileFormat) -> None: + c1 = {"col1": new_column("col1", "bigint")} + with get_writer(_format=writer_format) as writer: + assert writer._buffered_items_count == 0 + # single item + writer.write_data_item({"col1": 1}, columns=c1) + assert writer._buffered_items_count == 1 + # list + writer.write_data_item([{"col1": 1}, {"col1": 2}], columns=c1) + assert writer._buffered_items_count == 3 + writer._flush_items() + assert writer._buffered_items_count == 0 + assert writer._writer.items_count == 3 + + +def test_writer_items_count_arrow() -> None: + import pyarrow as pa + c1 = {"col1": new_column("col1", "bigint")} + with get_writer(_format="arrow") as writer: + assert writer._buffered_items_count == 0 + # single item + writer.write_data_item(pa.Table.from_pylist([{"col1": 1}]), columns=c1) + assert writer._buffered_items_count == 1 + # single item with many rows + writer.write_data_item(pa.Table.from_pylist([{"col1": 1}, {"col1": 2}]), columns=c1) + assert writer._buffered_items_count == 3 + # empty list + writer.write_data_item([], columns=c1) + assert writer._buffered_items_count == 3 + # list with one item + writer.write_data_item([pa.Table.from_pylist([{"col1": 1}])], columns=c1) + assert writer._buffered_items_count == 4 + # list with many items + writer.write_data_item( + [pa.Table.from_pylist([{"col1": 1}]), pa.Table.from_pylist([{"col1": 1}, {"col1": 2}])], + columns=c1 + ) + assert writer._buffered_items_count == 7 + writer._flush_items() + assert writer._buffered_items_count == 0 + assert writer._writer.items_count == 7 diff --git a/tests/common/test_data_writers/test_data_writers.py b/tests/common/data_writers/test_data_writers.py similarity index 100% rename from tests/common/test_data_writers/test_data_writers.py rename to tests/common/data_writers/test_data_writers.py diff --git a/tests/common/test_data_writers/test_parquet_writer.py b/tests/common/data_writers/test_parquet_writer.py similarity index 100% rename from tests/common/test_data_writers/test_parquet_writer.py rename to tests/common/data_writers/test_parquet_writer.py diff --git a/tests/common/schema/test_inference.py b/tests/common/schema/test_inference.py index be58adab53..24c97219fc 100644 --- a/tests/common/schema/test_inference.py +++ b/tests/common/schema/test_inference.py @@ -372,7 +372,8 @@ def test_corece_null_value_over_not_null(schema: Schema) -> None: def test_infer_with_autodetection(schema: Schema) -> None: - c = schema._infer_column("ts", pendulum.now().timestamp()) + # iso timestamp detection + c = schema._infer_column("ts", pendulum.now().isoformat()) assert c["data_type"] == "timestamp" schema._type_detections = [] c = schema._infer_column("ts", pendulum.now().timestamp()) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 5f1ab9279f..2b6d26ba12 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -351,7 +351,7 @@ def test_composite_primary_key(item_type: TItemFormat) -> None: {'created_at': 2, 'isrc': 'AAA', 'market': 'DE'}, {'created_at': 2, 'isrc': 'CCC', 'market': 'DE'}, {'created_at': 2, 'isrc': 'DDD', 'market': 'DE'}, - {'created_at': 2, 'isrc': 'CCC', 'market': 'DE'}, + {'created_at': 1, 'isrc': 'CCC', 'market': 'DE'}, ] source_items = data_to_item_format(item_type, data) @@ -366,8 +366,8 @@ def some_data(created_at=dlt.sources.incremental('created_at')): with c.execute_query("SELECT created_at, isrc, market FROM some_data order by created_at, isrc, market") as cur: rows = cur.fetchall() - expected = [(1, 'AAA', 'DE'), (2, 'AAA', 'DE'), (2, 'BBB', 'DE'), (2, 'CCC', 'DE'), (2, 'CCC', 'US'), (2, 'DDD', 'DE')] - assert rows == expected + expected = {(1, 'AAA', 'DE'), (2, 'AAA', 'DE'), (2, 'BBB', 'DE'), (2, 'CCC', 'DE'), (2, 'CCC', 'US'), (2, 'DDD', 'DE'), (1, 'CCC', 'DE')} + assert set(rows) == expected @pytest.mark.parametrize("item_type", ALL_ITEM_FORMATS) @@ -520,9 +520,6 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): assert py_ex.value.json_path == "item.timestamp" - - - @pytest.mark.parametrize("item_type", ALL_ITEM_FORMATS) def test_filter_processed_items(item_type: TItemFormat) -> None: """Checks if already processed items are filtered out""" diff --git a/tests/extract/utils.py b/tests/extract/utils.py index 2465a1b1e2..b109cdbdd9 100644 --- a/tests/extract/utils.py +++ b/tests/extract/utils.py @@ -1,12 +1,11 @@ -from typing import Any, Optional, List, Union, Literal, get_args +from typing import Any, Optional, List, Literal, get_args import pytest -from itertools import zip_longest, chain +from itertools import zip_longest -from dlt.common.typing import TDataItem, TDataItems, TAny +from dlt.common.typing import TDataItem, TDataItems from dlt.extract.extract import ExtractorStorage -from dlt.extract.typing import ItemTransform, ItemTransformFunc -from tests.cases import TArrowFormat +from dlt.extract.typing import ItemTransform import pandas as pd from dlt.common.libs.pyarrow import pyarrow as pa diff --git a/tests/load/filesystem/test_aws_credentials.py b/tests/load/filesystem/test_aws_credentials.py index bf9e0bd681..b4f367b4e1 100644 --- a/tests/load/filesystem/test_aws_credentials.py +++ b/tests/load/filesystem/test_aws_credentials.py @@ -52,6 +52,18 @@ def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None: assert c.is_resolved() assert not c.is_partial() + s3_cred = c.to_s3fs_credentials() + assert s3_cred == { + "key": "fake_access_key", + "secret": "fake_secret_key", + "token": "fake_session_token", + "profile": None, + "endpoint_url": None, + "client_kwargs": { + "region_name": session.get_config_variable('region') + } + } + c = AwsCredentials() c.parse_native_representation(botocore.session.get_session()) assert c.is_resolved() @@ -125,3 +137,4 @@ def set_aws_credentials_env(environment: Dict[str, str]) -> None: environment['AWS_ACCESS_KEY_ID'] = 'fake_access_key' environment['AWS_SECRET_ACCESS_KEY'] = 'fake_secret_key' environment['AWS_SESSION_TOKEN'] = 'fake_session_token' + environment['REGION_NAME'] = 'eu-central-1' diff --git a/tests/pipeline/test_arrow_sources.py b/tests/pipeline/test_arrow_sources.py index 759d5735c9..016eb1ee48 100644 --- a/tests/pipeline/test_arrow_sources.py +++ b/tests/pipeline/test_arrow_sources.py @@ -1,17 +1,13 @@ +import os import pytest import pandas as pd -from typing import Any, Union import pyarrow as pa import dlt -from dlt.common import Decimal from dlt.common.utils import uniq_id -from dlt.common.exceptions import TerminalValueError from dlt.pipeline.exceptions import PipelineStepFailed from tests.cases import arrow_table_all_data_types, TArrowFormat -from dlt.common.storages import LoadStorage - @pytest.mark.parametrize( @@ -111,3 +107,35 @@ def map_func(item): assert len(result) == 1 assert result[0]['int'][0].as_py() == 1 + + +@pytest.mark.parametrize("item_type", ["pandas", "table", "record_batch"]) +def test_extract_normalize_file_rotation(item_type: TArrowFormat) -> None: + # do not extract state + os.environ["RESTORE_FROM_DESTINATION"] = "False" + # use parquet for dummy + os.environ["DESTINATION__LOADER_FILE_FORMAT"] = "parquet" + + pipeline_name = "arrow_" + uniq_id() + pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + + item, rows = arrow_table_all_data_types(item_type) + + @dlt.resource + def data_frames(): + for _ in range(10): + yield item + + # get buffer written and file rotated with each yielded frame + os.environ[f"SOURCES__{pipeline_name.upper()}__DATA_WRITER__BUFFER_MAX_ITEMS"] = str(len(rows)) + os.environ[f"SOURCES__{pipeline_name.upper()}__DATA_WRITER__FILE_MAX_ITEMS"] = str(len(rows)) + + pipeline.extract(data_frames()) + # ten parquet files + assert len(pipeline.list_extracted_resources()) == 10 + info = pipeline.normalize(workers=3) + # with 10 * num rows + assert info.row_counts["data_frames"] == 10 * len(rows) + load_id = pipeline.list_normalized_load_packages()[0] + # 10 jobs on parquet files + assert len(pipeline.get_load_package_info(load_id).jobs["new_jobs"]) == 10 diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 48368a9df0..917dac75c4 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -9,7 +9,7 @@ import pytest import dlt -from dlt.common import json, sleep +from dlt.common import json, sleep, pendulum from dlt.common.configuration.container import Container from dlt.common.configuration.specs.aws_credentials import AwsCredentials from dlt.common.configuration.specs.exceptions import NativeValueError @@ -19,7 +19,7 @@ from dlt.common.exceptions import DestinationHasFailedJobs, DestinationTerminalException, PipelineStateNotAvailable, UnknownDestinationModule from dlt.common.pipeline import PipelineContext from dlt.common.runtime.collector import AliveCollector, EnlightenCollector, LogCollector, TqdmCollector -from dlt.common.schema.utils import new_column +from dlt.common.schema.utils import new_column, new_table from dlt.common.utils import uniq_id from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted @@ -1142,3 +1142,63 @@ def generic(start=8): pipeline = dlt.pipeline(destination='duckdb') pipeline.run(generic(), loader_file_format=file_format) + + +def test_remove_autodetect() -> None: + now = pendulum.now() + + @dlt.source + def autodetect(): + # add unix ts autodetection to current source schema + dlt.current.source_schema().add_type_detection("timestamp") + return dlt.resource([int(now.timestamp()), int(now.timestamp() + 1), int(now.timestamp() + 2)], name="numbers") + + pipeline = dlt.pipeline(destination='duckdb') + pipeline.run(autodetect()) + + # unix ts recognized + assert pipeline.default_schema.get_table("numbers")["columns"]["value"]["data_type"] == "timestamp" + + pipeline = pipeline.drop() + + source = autodetect() + source.schema.remove_type_detection("timestamp") + + pipeline = dlt.pipeline(destination='duckdb') + pipeline.run(source) + + assert pipeline.default_schema.get_table("numbers")["columns"]["value"]["data_type"] == "bigint" + + +def test_flattened_column_hint() -> None: + now = pendulum.now() + + # @dlt.resource(columns=[{"name": "value__timestamp", "data_type": "timestamp"}]) + @dlt.resource() + def flattened_dict(): + # dlt.current.source_schema().add_type_detection("timestamp") + + for delta in range(4): + yield {"delta": delta, "values": [{"Value": {"timestampValue": now.timestamp() + delta}}]} + + @dlt.source + def nested_resource(): + # we need to create a whole structure + dict_resource = flattened_dict() + # add table from resource + dlt.current.source_schema().update_table(dict_resource.compute_table_schema()) + values_table = new_table( + dict_resource.name + "__values", + parent_table_name=dict_resource.name, + columns=[{"name": "value__timestamp_value", "data_type": "timestamp"}] + ) + # and child table + dlt.current.source_schema().update_table(values_table) + return dict_resource + + pipeline = dlt.pipeline(destination='duckdb') + pipeline.run(nested_resource()) + # print(pipeline.default_schema.to_pretty_yaml()) + assert pipeline.default_schema.get_table("flattened_dict__values")["columns"]["value__timestamp_value"]["data_type"] == "timestamp" + # make sure data is there + assert pipeline.last_trace.last_normalize_info.row_counts["flattened_dict__values"] == 4 \ No newline at end of file