diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index d3e1100128..870f8eabbe 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -7,7 +7,7 @@ body: attributes: value: | Thanks for reporting a bug for dlt! Please fill out the sections below. - If you are not sure if this is a bug or not, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) + If you are not sure if this is a bug or not, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) and ask in the #3-technical-help channel. - type: input attributes: @@ -34,7 +34,7 @@ body: attributes: label: Steps to reproduce description: > - How can we replicate the issue? If it's not straightforward to reproduce, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) + How can we replicate the issue? If it's not straightforward to reproduce, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) and ask in the #3-technical-help channel. placeholder: > Provide a step-by-step description of how to reproduce the problem you are running into. diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml index 653b08cf7f..5756e2497c 100644 --- a/.github/ISSUE_TEMPLATE/config.yml +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -2,5 +2,5 @@ blank_issues_enabled: true contact_links: - name: Ask a question or get support on dlt Slack - url: https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g + url: https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA about: Need help or support? Join our dlt community on Slack and get assistance. diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml index 095a487771..a74c4774ef 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yml +++ b/.github/ISSUE_TEMPLATE/feature_request.yml @@ -7,7 +7,7 @@ body: attributes: value: | Thanks for suggesting a feature for dlt! - If you like to discuss your idea first, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) + If you like to discuss your idea first, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) and pose your questions in the #3-technical-help channel. For minor features and improvements, feel free to open a [pull request](https://github.com/dlt-hub/dlt/pulls) directly. - type: textarea diff --git a/.github/workflows/test_doc_snippets.yml b/.github/workflows/test_doc_snippets.yml index b2a2f241db..e158c2d669 100644 --- a/.github/workflows/test_doc_snippets.yml +++ b/.github/workflows/test_doc_snippets.yml @@ -56,7 +56,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E duckdb -E weaviate --with docs --without airflow + run: poetry install --no-interaction -E duckdb -E weaviate -E parquet --with docs --without airflow - name: Run linter and tests run: make test-and-lint-snippets diff --git a/dlt/common/configuration/exceptions.py b/dlt/common/configuration/exceptions.py index cae666dab1..f019565013 100644 --- a/dlt/common/configuration/exceptions.py +++ b/dlt/common/configuration/exceptions.py @@ -1,6 +1,8 @@ +import os from typing import Any, Mapping, Type, Tuple, NamedTuple, Sequence from dlt.common.exceptions import DltException, TerminalException +from dlt.common.utils import main_module_file_path class LookupTrace(NamedTuple): @@ -48,6 +50,14 @@ def __str__(self) -> str: msg += f'\tfor field "{f}" config providers and keys were tried in following order:\n' for tr in field_traces: msg += f'\t\tIn {tr.provider} key {tr.key} was not found.\n' + # check if entry point is run with path. this is common problem so warn the user + main_path = main_module_file_path() + main_dir = os.path.dirname(main_path) + abs_main_dir = os.path.abspath(main_dir) + if abs_main_dir != os.getcwd(): + # directory was specified + msg += "WARNING: dlt looks for .dlt folder in your current working directory and your cwd (%s) is different from directory of your pipeline script (%s).\n" % (os.getcwd(), abs_main_dir) + msg += "If you keep your secret files in the same folder as your pipeline script but run your script from some other folder, secrets/configs will not be found\n" msg += "Please refer to https://dlthub.com/docs/general-usage/credentials for more information\n" return msg diff --git a/dlt/common/configuration/specs/aws_credentials.py b/dlt/common/configuration/specs/aws_credentials.py index 6ba661ae88..8c4aabc4ee 100644 --- a/dlt/common/configuration/specs/aws_credentials.py +++ b/dlt/common/configuration/specs/aws_credentials.py @@ -1,7 +1,7 @@ from typing import Optional, Dict, Any from dlt.common.exceptions import MissingDependencyException -from dlt.common.typing import TSecretStrValue +from dlt.common.typing import TSecretStrValue, DictStrAny from dlt.common.configuration.specs import CredentialsConfiguration, CredentialsWithDefault, configspec from dlt.common.configuration.specs.exceptions import InvalidBoto3Session from dlt import version @@ -19,13 +19,16 @@ class AwsCredentialsWithoutDefaults(CredentialsConfiguration): def to_s3fs_credentials(self) -> Dict[str, Optional[str]]: """Dict of keyword arguments that can be passed to s3fs""" - return dict( + credentials: DictStrAny = dict( key=self.aws_access_key_id, secret=self.aws_secret_access_key, token=self.aws_session_token, profile=self.profile_name, endpoint_url=self.endpoint_url, ) + if self.region_name: + credentials["client_kwargs"] = {"region_name": self.region_name} + return credentials def to_native_representation(self) -> Dict[str, Optional[str]]: """Return a dict that can be passed as kwargs to boto3 session""" diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index 98f1b51bee..5c93e22bc6 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -1,4 +1,5 @@ import gzip +from functools import reduce from typing import List, IO, Any, Optional, Type, TypeVar, Generic from dlt.common.utils import uniq_id @@ -58,6 +59,7 @@ def __init__( self._current_columns: TTableSchemaColumns = None self._file_name: str = None self._buffered_items: List[TDataItem] = [] + self._buffered_items_count: int = 0 self._writer: TWriter = None self._file: IO[Any] = None self._closed = False @@ -79,10 +81,20 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non if isinstance(item, List): # items coming in single list will be written together, not matter how many are there self._buffered_items.extend(item) + # update row count, if item supports "num_rows" it will be used to count items + if len(item) > 0 and hasattr(item[0], "num_rows"): + self._buffered_items_count += sum(tbl.num_rows for tbl in item) + else: + self._buffered_items_count += len(item) else: self._buffered_items.append(item) + # update row count, if item supports "num_rows" it will be used to count items + if hasattr(item, "num_rows"): + self._buffered_items_count += item.num_rows + else: + self._buffered_items_count += 1 # flush if max buffer exceeded - if len(self._buffered_items) >= self.buffer_max_items: + if self._buffered_items_count >= self.buffer_max_items: self._flush_items() # rotate the file if max_bytes exceeded if self._file: @@ -118,7 +130,7 @@ def _rotate_file(self) -> None: self._file_name = self.file_name_template % uniq_id(5) + "." + self._file_format_spec.file_extension def _flush_items(self, allow_empty_file: bool = False) -> None: - if len(self._buffered_items) > 0 or allow_empty_file: + if self._buffered_items_count > 0 or allow_empty_file: # we only open a writer when there are any items in the buffer and first flush is requested if not self._writer: # create new writer and write header @@ -131,7 +143,9 @@ def _flush_items(self, allow_empty_file: bool = False) -> None: # write buffer if self._buffered_items: self._writer.write_data(self._buffered_items) + # reset buffer and counter self._buffered_items.clear() + self._buffered_items_count = 0 def _flush_and_close_file(self) -> None: # if any buffered items exist, flush them diff --git a/dlt/common/data_writers/escape.py b/dlt/common/data_writers/escape.py index 1f44c96a73..c08ce31d88 100644 --- a/dlt/common/data_writers/escape.py +++ b/dlt/common/data_writers/escape.py @@ -89,6 +89,38 @@ def escape_mssql_literal(v: Any) -> Any: return str(v) +# TODO review escape logic -- this should resolve SQL injection / assertion errors +def escape_synapse_literal(v: Any) -> Any: + if isinstance(v, str): + v = v.replace("'", "''") # escape single quotes + # Further sanitizations, if needed, can be added here + elif isinstance(v, (datetime, date, time)): + return v.isoformat() + elif isinstance(v, (list, dict)): + return _escape_extended(json.dumps(v), escape_dict=SYNAPSE_ESCAPE_DICT, escape_re=SYNAPSE_SQL_ESCAPE_RE) + elif isinstance(v, bytes): + base_64_string = base64.b64encode(v).decode('ascii') + return f"""CAST('' AS XML).value('xs:base64Binary("{base_64_string}")', 'VARBINARY(MAX)')""" + elif isinstance(v, bool): + return str(int(v)) + return str(v) + + + +# TODO add references +SYNAPSE_ESCAPE_DICT = { + "'": "''", + '\n': "' + CHAR(10) + N'", + '\r': "' + CHAR(13) + N'", + '\t': "' + CHAR(9) + N'", +} + +SYNAPSE_SQL_ESCAPE_RE = _make_sql_escape_re(SYNAPSE_ESCAPE_DICT) + +def escape_synapse_identifier(v: str) -> str: + return '"' + v.replace('"', '""') + '"' + + def escape_redshift_identifier(v: str) -> str: return '"' + v.replace('"', '""').replace("\\", "\\\\") + '"' diff --git a/dlt/common/data_writers/writers.py b/dlt/common/data_writers/writers.py index 73e13ec46f..daf1dbbe17 100644 --- a/dlt/common/data_writers/writers.py +++ b/dlt/common/data_writers/writers.py @@ -274,6 +274,8 @@ def write_data(self, rows: Sequence[Any]) -> None: self.writer.write_batch(row) else: raise ValueError(f"Unsupported type {type(row)}") + # count rows that got written + self.items_count += row.num_rows @classmethod def data_format(cls) -> TFileFormatSpec: diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index aef2032261..77a5ae8e8e 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -393,6 +393,18 @@ def update_normalizers(self) -> None: normalizers["json"] = normalizers["json"] or self._normalizers_config["json"] self._configure_normalizers(normalizers) + def add_type_detection(self, detection: TTypeDetections) -> None: + """Add type auto detection to the schema.""" + if detection not in self.settings["detections"]: + self.settings["detections"].append(detection) + self._compile_settings() + + def remove_type_detection(self, detection: TTypeDetections) -> None: + """Adds type auto detection to the schema.""" + if detection in self.settings["detections"]: + self.settings["detections"].remove(detection) + self._compile_settings() + def _infer_column(self, k: str, v: Any, data_type: TDataType = None, is_variant: bool = False) -> TColumnSchema: column_schema = TColumnSchema( name=k, diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 32bd4ade1c..f2075ce85d 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -698,4 +698,4 @@ def standard_hints() -> Dict[TColumnHint, List[TSimpleRegex]]: def standard_type_detections() -> List[TTypeDetections]: - return ["timestamp", "iso_timestamp"] + return ["iso_timestamp"] diff --git a/dlt/destinations/filesystem/filesystem.py b/dlt/destinations/filesystem/filesystem.py index 49ad36dd16..766f384024 100644 --- a/dlt/destinations/filesystem/filesystem.py +++ b/dlt/destinations/filesystem/filesystem.py @@ -131,7 +131,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: for search_prefix in truncate_prefixes: if item.startswith(search_prefix): # NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors - logger.info(f"DEL {item}") + # logger.info(f"DEL {item}") # print(f"DEL {item}") self.fs_client.rm(item) except FileNotFoundError: diff --git a/dlt/destinations/insert_job_client.py b/dlt/destinations/insert_job_client.py index d5759db6c2..cfad66e2d5 100644 --- a/dlt/destinations/insert_job_client.py +++ b/dlt/destinations/insert_job_client.py @@ -114,6 +114,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> # this is using sql_client internally and will raise a right exception if file_path.endswith("insert_values"): job = InsertValuesLoadJob(table["name"], file_path, self.sql_client) + return job # # TODO: implement indexes and primary keys for postgres diff --git a/dlt/destinations/synapse/README.md b/dlt/destinations/synapse/README.md new file mode 100644 index 0000000000..4348265088 --- /dev/null +++ b/dlt/destinations/synapse/README.md @@ -0,0 +1,5 @@ +# loader account setup + +1. Create new database `CREATE DATABASE dlt_data` +2. Create new user, set password `CREATE USER loader WITH PASSWORD 'loader';` +3. Set as database owner (we could set lower permission) `ALTER DATABASE dlt_data OWNER TO loader` diff --git a/dlt/destinations/synapse/__init__.py b/dlt/destinations/synapse/__init__.py new file mode 100644 index 0000000000..f6deab8166 --- /dev/null +++ b/dlt/destinations/synapse/__init__.py @@ -0,0 +1,52 @@ +from typing import Type + +from dlt.common.schema.schema import Schema +from dlt.common.configuration import with_config, known_sections +from dlt.common.configuration.accessors import config +from dlt.common.data_writers.escape import escape_postgres_identifier, escape_synapse_literal, escape_synapse_identifier, escape_postgres_identifier, escape_mssql_literal +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration +from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE +from dlt.common.wei import EVM_DECIMAL_PRECISION + +from dlt.destinations.synapse.configuration import SynapseClientConfiguration + + +@with_config(spec=SynapseClientConfiguration, sections=(known_sections.DESTINATION, "synapse",)) +def _configure(config: SynapseClientConfiguration = config.value) -> SynapseClientConfiguration: + return config + + +def capabilities() -> DestinationCapabilitiesContext: + # https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-service-capacity-limits + caps = DestinationCapabilitiesContext() + caps.preferred_loader_file_format = "insert_values" + caps.supported_loader_file_formats = ["insert_values"] + caps.preferred_staging_file_format = "parquet" + caps.supported_staging_file_formats = ["parquet"] + caps.escape_identifier = escape_synapse_identifier + caps.escape_literal = escape_synapse_literal + caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE) + caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0) + # https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN + caps.max_identifier_length = 128 + caps.max_column_identifier_length = 128 + caps.max_query_length = 4 * 1024 * 64 * 1024 + caps.is_max_query_length_in_bytes = True + caps.max_text_data_type_length = 2 ** 30 - 1 + caps.is_max_text_data_type_length_in_bytes = False + caps.supports_ddl_transactions = False + caps.max_rows_per_insert = 1000 + + return caps + + +def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: + # import client when creating instance so capabilities and config specs can be accessed without dependencies installed + from dlt.destinations.synapse.synapse import SynapseClient + + return SynapseClient(schema, _configure(initial_config)) # type: ignore[arg-type] + + +def spec() -> Type[DestinationClientConfiguration]: + return SynapseClientConfiguration \ No newline at end of file diff --git a/dlt/destinations/synapse/configuration.py b/dlt/destinations/synapse/configuration.py new file mode 100644 index 0000000000..cbbcd5d780 --- /dev/null +++ b/dlt/destinations/synapse/configuration.py @@ -0,0 +1,89 @@ +from typing import Final, ClassVar, Any, List, Optional +from sqlalchemy.engine import URL + +from dlt.common.configuration import configspec +from dlt.common.configuration.specs import ConnectionStringCredentials +from dlt.common.utils import digest128 +from dlt.common.typing import TSecretValue +from dlt.common.exceptions import SystemConfigurationException + +from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration + + +@configspec +class SynapseCredentials(ConnectionStringCredentials): + drivername: Final[str] = "sqlserver" # type: ignore + user: Final[str] = "dltadmin" + password: TSecretValue + host: str + port: int = 1433 + connect_timeout: int = 15 + odbc_driver: str = "ODBC Driver 17 for SQL Server" + + __config_gen_annotations__: ClassVar[List[str]] = ["port", "connect_timeout"] + + def parse_native_representation(self, native_value: Any) -> None: + # TODO: Support ODBC connection string or sqlalchemy URL + super().parse_native_representation(native_value) + self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout)) + if not self.is_partial(): + self.resolve() + + def on_resolved(self) -> None: + self.database = self.database.lower() + + def to_url(self) -> URL: + url = super().to_url() + url.update_query_pairs([("connect_timeout", str(self.connect_timeout))]) + return url + + def on_partial(self) -> None: + self.odbc_driver = self._get_odbc_driver() + if not self.is_partial(): + self.resolve() + + def _get_odbc_driver(self) -> str: + if self.odbc_driver: + return self.odbc_driver + # Pick a default driver if available + supported_drivers = ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server'] + import pyodbc + available_drivers = pyodbc.drivers() + for driver in supported_drivers: + if driver in available_drivers: + return driver + docs_url = "https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server?view=sql-server-ver16" + raise SystemConfigurationException( + f"No supported ODBC driver found for MS SQL Server. " + f"See {docs_url} for information on how to install the '{supported_drivers[0]}' on your platform." + ) + + def to_odbc_dsn(self) -> str: + params = { + "DRIVER": self.odbc_driver, + "SERVER": self.host, + "PORT": self.port, + "DATABASE": self.database, + "UID": self.username, + "PWD": self.password, + "LongAsMax": "yes", + "MARS_Connection": "yes" + } + if self.query: + params.update(self.query) + return ";".join([f"{k}={v}" for k, v in params.items()]) + + + +@configspec +class SynapseClientConfiguration(DestinationClientDwhWithStagingConfiguration): + destination_name: Final[str] = "synapse" # type: ignore + credentials: SynapseCredentials + + create_indexes: bool = False + + def fingerprint(self) -> str: + """Returns a fingerprint of host part of a connection string""" + if self.credentials and self.credentials.host: + return digest128(self.credentials.host) + return "" diff --git a/dlt/destinations/synapse/sql_client.py b/dlt/destinations/synapse/sql_client.py new file mode 100644 index 0000000000..b67fb4ca7c --- /dev/null +++ b/dlt/destinations/synapse/sql_client.py @@ -0,0 +1,252 @@ +import platform +import struct +from datetime import datetime, timedelta, timezone # noqa: I251 + +from dlt.common.destination import DestinationCapabilitiesContext + +import re + +import pyodbc + +from contextlib import contextmanager +from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence + +from dlt.destinations.exceptions import DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation +from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction +from dlt.destinations.sql_client import DBApiCursorImpl, SqlClientBase, raise_database_error, raise_open_connection_error + +from dlt.destinations.synapse.configuration import SynapseCredentials +from dlt.destinations.synapse import capabilities + +from typing import List, Tuple, Union, Any #Import List for INSERT query generation + +import json +def is_valid_json(s: str) -> bool: + """Check if a string is valid JSON.""" + if not (s.startswith('{') and s.endswith('}')) and not (s.startswith('[') and s.endswith(']')): + return False + try: + json.loads(s) + return True + except json.JSONDecodeError: + return False + + +def handle_datetimeoffset(dto_value: bytes) -> datetime: + # ref: https://github.com/mkleehammer/pyodbc/issues/134#issuecomment-281739794 + tup = struct.unpack("<6hI2h", dto_value) # e.g., (2017, 3, 16, 10, 35, 18, 500000000, -6, 0) + return datetime( + tup[0], tup[1], tup[2], tup[3], tup[4], tup[5], tup[6] // 1000, timezone(timedelta(hours=tup[7], minutes=tup[8])) + ) + +def is_valid_xml(s: str) -> bool: + # simple check for XML data + return s.strip().startswith('<') and s.strip().endswith('>') + +class PyOdbcSynapseClient(SqlClientBase[pyodbc.Connection], DBTransaction): + def execute_fragments(self, fragments: Sequence[AnyStr], *args: Any, **kwargs: Any) -> Optional[Sequence[Sequence[Any]]]: + # Check if fragments has one element and that element is a list/tuple with two elements + if len(fragments) == 1 and isinstance(fragments[0], (list, tuple)) and len(fragments[0]) == 2: + # Unpack the inner list/tuple into sql and params + sql, params = fragments[0] + # Execute the SQL query with the provided parameters + return self.execute_sql(sql, *params, **kwargs) + else: + # If fragments doesn't match the expected structure, fall back to the default behavior + return self.execute_sql("".join(fragments), *args, **kwargs) # type: ignore + + dbapi: ClassVar[DBApi] = pyodbc + capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() + + def __init__(self, dataset_name: str, credentials: SynapseCredentials) -> None: + super().__init__(credentials.database, dataset_name) + self._conn: pyodbc.Connection = None + self._transaction_in_progress = False # Transaction state flag + self.credentials = credentials + + + def open_connection(self) -> pyodbc.Connection: + try: + # Establish a connection + conn_str = ( + f"DRIVER={{{self.credentials.odbc_driver}}};" + f"SERVER={self.credentials.host};" + f"DATABASE={self.credentials.database};" + f"UID={self.credentials.user};" + f"PWD={self.credentials.password};" + ) + self._conn = pyodbc.connect(conn_str) + + # Add the converter for datetimeoffset + self._conn.add_output_converter(-155, handle_datetimeoffset) + + # Noting that autocommit is being set to True + self._conn.autocommit = True + + return self._conn + + except pyodbc.Error as e: + raise # re-raise the error without logging it + + @raise_open_connection_error + def close_connection(self) -> None: + if self._conn: + self._conn.close() + self._conn = None + + @contextmanager + def begin_transaction(self) -> Iterator[DBTransaction]: + try: + self._conn.autocommit = False + yield self + self.commit_transaction() + except Exception: + self.rollback_transaction() + raise + + @raise_database_error + def commit_transaction(self) -> None: + self._conn.commit() + self._conn.autocommit = True + + @raise_database_error + def rollback_transaction(self) -> None: + self._conn.rollback() + self._conn.autocommit = True + + + @property + def native_connection(self) -> pyodbc.Connection: + return self._conn + + def drop_dataset(self) -> None: + # MS Sql doesn't support DROP ... CASCADE, drop tables in the schema first + # Drop all views + rows = self.execute_sql( + "SELECT table_name FROM information_schema.views WHERE table_schema = %s;", self.dataset_name + ) + view_names = [row[0] for row in rows] + self._drop_views(*view_names) + # Drop all tables + rows = self.execute_sql( + "SELECT table_name FROM information_schema.tables WHERE table_schema = %s;", self.dataset_name + ) + table_names = [row[0] for row in rows] + self.drop_tables(*table_names) + + self.execute_sql("DROP SCHEMA %s;" % self.fully_qualified_dataset_name()) + + def table_exists(self, table_name: str) -> bool: + query = """ + SELECT COUNT(*) + FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? + """ + result = self.execute_sql(query, self.dataset_name, table_name) + return result[0][0] > 0 + + def drop_tables(self, *tables: str) -> None: + if not tables: + return + for table in tables: + if self.table_exists(table): + self.execute_sql(f"DROP TABLE {self.make_qualified_table_name(table)};") + + def _drop_views(self, *tables: str) -> None: + if not tables: + return + statements = [f"DROP VIEW {self.make_qualified_table_name(table)};" for table in tables] + self.execute_fragments(statements) + + def set_input_sizes(self, *args): + if len(args) == 1 and isinstance(args[0], tuple): + args = args[0] + + args = list(args) # Convert tuple to list for modification + input_sizes = [] + for index, arg in enumerate(args): + #print(f"Processing arg {index}: {arg}") # Print the argument being processed + + if isinstance(arg, str): + if arg.isdigit(): # Check if the string is a valid integer + input_sizes.append((pyodbc.SQL_BIGINT, 0, 0)) # Set input size for bigint columns + #print(f"Identified bigint arg at index {index}") # Print when a bigint argument is identified + elif is_valid_json(arg): + input_sizes.append((pyodbc.SQL_WVARCHAR, 0, 0)) # Set input size for JSON columns + else: + input_sizes.append((pyodbc.SQL_WVARCHAR, 0, 0)) # Set input size for string columns + elif isinstance(arg, (bytes, bytearray)): + input_sizes.append((pyodbc.SQL_VARBINARY, 0, 0)) # Set input size for binary columns + else: + input_sizes.append(None) # Default handling for other data types + + return input_sizes, tuple(args) + + + def execute_sql(self, sql: AnyStr, *args: Any, **kwargs: Any) -> Optional[Sequence[Sequence[Any]]]: + print("SQL:", sql) + print("Arguments:", args) + with self.execute_query(sql, *args, **kwargs) as curr: + if curr.description is None: + return None + else: + f = curr.fetchall() + return f + + @contextmanager + @raise_database_error + def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]: + print("Query:", query) + print("Args for query:", args) + + assert isinstance(query, str) + + if kwargs: + raise NotImplementedError("pyodbc does not support named parameters in queries") + + curr = self._conn.cursor() + + # Set the converter for datetimeoffset + self._conn.add_output_converter(-155, handle_datetimeoffset) + + # #TODO confirm this implementation ok ... MSSQL subclass? + # Set input sizes for Synapse to handle varchar(max) instead of ntext + input_sizes, args = self.set_input_sizes(*args) + curr.setinputsizes(input_sizes) + + try: + query = query.replace('%s', '?') # Updated line + + curr.execute(query, *args) # No flattening, just unpack args directly + yield DBApiCursorImpl(curr) # type: ignore[abstract] + + except pyodbc.Error as outer: + raise outer + + finally: + curr.close() + + def fully_qualified_dataset_name(self, escape: bool = True) -> str: + return self.capabilities.escape_identifier(self.dataset_name) if escape else self.dataset_name + + @classmethod + def _make_database_exception(cls, ex: Exception) -> Exception: + if isinstance(ex, pyodbc.ProgrammingError): + if ex.args[0] == "42S02": + return DatabaseUndefinedRelation(ex) + if ex.args[1] == "HY000": + return DatabaseTransientException(ex) + elif ex.args[0] == "42000": + if "(15151)" in ex.args[1]: + return DatabaseUndefinedRelation(ex) + return DatabaseTransientException(ex) + elif isinstance(ex, pyodbc.OperationalError): + return DatabaseTransientException(ex) + elif isinstance(ex, pyodbc.Error): + if ex.args[0] == "07002": # incorrect number of arguments supplied + return DatabaseTransientException(ex) + return DatabaseTerminalException(ex) + + @staticmethod + def is_dbapi_exception(ex: Exception) -> bool: + return isinstance(ex, pyodbc.Error) \ No newline at end of file diff --git a/dlt/destinations/synapse/synapse.py b/dlt/destinations/synapse/synapse.py new file mode 100644 index 0000000000..5b238358ee --- /dev/null +++ b/dlt/destinations/synapse/synapse.py @@ -0,0 +1,283 @@ +from typing import ClassVar, Dict, Optional, Sequence, List, Any, Tuple, Iterator + +from dlt.common.wei import EVM_DECIMAL_PRECISION +from dlt.common.destination.reference import NewLoadJob +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.data_types import TDataType +from dlt.common.schema import TColumnSchema, TColumnHint, Schema +from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat +from dlt.common.storages import FileStorage +from dlt.common.utils import uniq_id + +from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlMergeJob + +from dlt.destinations.insert_job_client import InsertValuesJobClient, InsertValuesLoadJob + +from dlt.destinations.synapse import capabilities +from dlt.destinations.synapse.sql_client import PyOdbcSynapseClient +from dlt.destinations.synapse.configuration import SynapseClientConfiguration +from dlt.destinations.sql_client import SqlClientBase +from dlt.common.schema.typing import COLUMN_HINTS +from dlt.destinations.type_mapping import TypeMapper + +from dlt.common.data_writers.escape import escape_synapse_identifier, escape_synapse_literal + + +import re + +# TODO remove logging +import logging # Import the logging module if it's not already imported +logger = logging.getLogger(__name__) + + +class SynapseTypeMapper(TypeMapper): + sct_to_unbound_dbt = { + "complex": "nvarchar(max)", + "text": "nvarchar(max)", + "double": "float", + "bool": "bit", + "bigint": "bigint", + "binary": "varbinary(max)", + "date": "date", + "timestamp": "datetimeoffset", + "time": "time", + } + + sct_to_dbt = { + "complex": "nvarchar(%i)", + "text": "nvarchar(%i)", + "timestamp": "datetimeoffset(%i)", + "binary": "varbinary(%i)", + "decimal": "decimal(%i,%i)", + "time": "time(%i)", + "wei": "decimal(%i,%i)" + } + + dbt_to_sct = { + "nvarchar": "text", + "float": "double", + "bit": "bool", + "datetimeoffset": "timestamp", + "date": "date", + "bigint": "bigint", + "varbinary": "binary", + "decimal": "decimal", + "time": "time", + "tinyint": "bigint", + "smallint": "bigint", + "int": "bigint", + } + +HINT_TO_SYNAPSE_ATTR: Dict[TColumnHint, str] = { + "unique": "UNIQUE" +} + +class SynapseStagingCopyJob(SqlStagingCopyJob): + + @classmethod + def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]) -> List[str]: + sql: List[str] = [] + for table in table_chain: + with sql_client.with_staging_dataset(staging=True): + staging_table_name = sql_client.make_qualified_table_name(table["name"]) + table_name = sql_client.make_qualified_table_name(table["name"]) + # drop destination table + sql.append(f"DROP TABLE {table_name};") + # moving staging table to destination schema + sql.append(f"ALTER SCHEMA {sql_client.fully_qualified_dataset_name()} TRANSFER {staging_table_name};") + return sql + + +class SynapseMergeJob(SqlMergeJob): + @classmethod + def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: str, key_clauses: Sequence[str], for_delete: bool) -> List[str]: + """Generate sql clauses that may be used to select or delete rows in root table of destination dataset + """ + if for_delete: + # MS SQL doesn't support alias in DELETE FROM + return [f"FROM {root_table_name} WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} WHERE {' OR '.join([c.format(d=root_table_name,s=staging_root_table_name) for c in key_clauses])})"] + return SqlMergeJob.gen_key_table_clauses(root_table_name, staging_root_table_name, key_clauses, for_delete) + + @classmethod + def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: + return f"SELECT * INTO {temp_table_name} FROM ({select_sql}) as t;" + + @classmethod + def _new_temp_table_name(cls, name_prefix: str) -> str: + name = SqlMergeJob._new_temp_table_name(name_prefix) + return '#' + name + + +class SynapseInsertValuesLoadJob(InsertValuesLoadJob): + + def __init__(self, table_name: str, file_path: str, sql_client: SqlClientBase[Any]) -> None: + # First, set any attributes specific to this subclass + self._sql_client = sql_client + self._file_name = FileStorage.get_file_name_from_file_path(file_path) + + # Then, call the parent class's __init__ method with the required arguments + super().__init__(table_name, file_path, sql_client) + + def _insert(self, qualified_table_name: str, file_path: str) -> Iterator[List[str]]: + # First, get the original SQL fragments + original_sql_fragments = super()._insert(qualified_table_name, file_path) + if original_sql_fragments is None: + raise ValueError("Superclass _insert method returned None instead of an iterable") + print(f'Original SQL fragments: {original_sql_fragments}') # Debug print + print(f'Type of Original SQL fragments: {type(original_sql_fragments)}') # Additional debug print + + # Initialize insert_sql as an empty list + insert_sql = [] + + # Now, adapt each SQL fragment for Synapse using the generate_insert_query method + for original_sql in original_sql_fragments: + print(f'Processing original SQL fragment: {original_sql}') # Debug print + + # Parse the original SQL to extract table name, columns, and rows + # This is a simplified example, you'll need a more robust way to parse the SQL + # TODO pyodbc adding \n, cleaning up string for processing + original_sql_joined = ''.join(original_sql).replace('\n', '') + table_name_match = re.search(r'INSERT INTO (.*?)\(', original_sql_joined) + columns_match = re.search(r'\((.*?)\)', original_sql_joined) + values_match = re.search(r'VALUES(.*?);', original_sql_joined, re.DOTALL) + + if table_name_match and columns_match and values_match: + table_name = table_name_match.group(1) + columns_str = columns_match.group(1) + values_str = values_match.group(1) + + # Split columns and values strings into lists + columns = [col.strip() for col in columns_str.split(',')] + values = [[val.strip() for val in value_group.split(',')] for value_group in values_str.split('),(')] + + # Call generate_insert_query with the extracted values + adapted_sql, param_values = self.generate_insert_query(table_name, columns, values) + + insert_sql.append([adapted_sql, param_values]) + + yield insert_sql + + # In Azure Synapse, must break out SELECT statements for multi-row INSERT + # https://stackoverflow.com/questions/36141006/how-to-insert-multiple-rows-into-sql-server-parallel-data-warehoue-table + def generate_insert_query(self, table_name: str, columns: List[str], rows: List[List[Any]]) -> Tuple[str, List[Any]]: + print("THESE ARE THE ROWS FOR GENERATING INSERT QUERY: " + str(rows)) + + try: + escaped_column_names = ', '.join(columns) # This line is unchanged + + # Placeholder for each value in a row + placeholder = ', '.join(['?' for _ in columns]) + + # Process each row individually + sql_fragments = [] + param_values = [] + for row in rows: + # Remove leading and trailing parenthesis from each element in the row + clean_row = [elem.strip('()') for elem in row] + + # Create the SQL fragment for this row + sql_fragments.append(f"SELECT {placeholder}") + + # Extend param_values with the cleaned values + param_values.extend(clean_row) # Changed this line + + # Combine the individual SQL fragments + all_select_statements = " UNION ALL ".join(sql_fragments) + print("Combined SELECT statements: " + all_select_statements) # This will print the combined SELECT statements + + # Building the final SQL query + new_sql = f'INSERT INTO {table_name}({escaped_column_names}) {all_select_statements}' + print("NEW SQL STRUCTURE: " + str(new_sql)) # This will print the new SQL structure + + print("Param values: " + str(param_values)) # This will print the parameters as a flat list + + return new_sql, param_values # This will return the parameters as a flat list + + except Exception as e: + logger.error(f"Failed to generate insert query: {e}") + raise + + +class SynapseClient(InsertValuesJobClient): + #Synapse does not support multi-row inserts using a single INSERT INTO statement + + capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() + + def __init__(self, schema: Schema, config: SynapseClientConfiguration) -> None: + sql_client = PyOdbcSynapseClient( + config.normalize_dataset_name(schema), + config.credentials + ) + super().__init__(schema, config, sql_client) + self.config: SynapseClientConfiguration = config + self.sql_client = sql_client + self.active_hints = HINT_TO_SYNAPSE_ATTR if self.config.create_indexes else {} + self.type_mapper = SynapseTypeMapper(self.capabilities) + + def _create_merge_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob: + return SynapseMergeJob.from_table_chain(table_chain, self.sql_client) + + def _make_add_column_sql(self, new_columns: Sequence[TColumnSchema]) -> List[str]: + # Override because mssql requires multiple columns in a single ADD COLUMN clause + return ["ADD \n" + ",\n".join(self._get_column_def_sql(c) for c in new_columns)] + + def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool) -> List[str]: + # build sql + canonical_name = self.sql_client.make_qualified_table_name(table_name) + sql_result: List[str] = [] + if not generate_alter: + # build CREATE + sql = f"CREATE TABLE {canonical_name} (\n" + sql += ",\n".join([self._get_column_def_sql(c) for c in new_columns]) + sql += ") WITH (HEAP)" + sql_result.append(sql) + else: + sql_base = f"ALTER TABLE {canonical_name}\n" + add_column_statements = self._make_add_column_sql(new_columns) + if self.capabilities.alter_add_multi_column: + column_sql = ",\n" + sql_result.append(sql_base + column_sql.join(add_column_statements)) + else: + # build ALTER as separate statement for each column (redshift limitation) + sql_result.extend([sql_base + col_statement for col_statement in add_column_statements]) + + # scan columns to get hints + if generate_alter: + # no hints may be specified on added columns + for hint in COLUMN_HINTS: + if any(c.get(hint, False) is True for c in new_columns): + hint_columns = [self.capabilities.escape_identifier(c["name"]) for c in new_columns if c.get(hint, False)] + if hint == "not_null": + logger.warning(f"Column(s) {hint_columns} with NOT NULL are being added to existing table {canonical_name}." + " If there's data in the table the operation will fail.") + else: + logger.warning(f"Column(s) {hint_columns} with hint {hint} are being added to existing table {canonical_name}." + " Several hint types may not be added to existing tables.") + return sql_result + + def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: + return [SynapseMergeJob.from_table_chain(table_chain, self.sql_client)] + + def _make_add_column_sql(self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None) -> List[str]: + # Override because mssql requires multiple columns in a single ADD COLUMN clause + return ["ADD \n" + ",\n".join(self._get_column_def_sql(c, table_format) for c in new_columns)] + + def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: + sc_type = c["data_type"] + if sc_type == "text" and c.get("unique"): + # MSSQL does not allow index on large TEXT columns + db_type = "nvarchar(%i)" % (c.get("precision") or 900) + else: + db_type = self.type_mapper.to_db_type(c) + + hints_str = " ".join(self.active_hints.get(h, "") for h in self.active_hints.keys() if c.get(h, False) is True) + column_name = self.capabilities.escape_identifier(c["name"]) + return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c['nullable'])}" + + def _create_replace_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: + if self.config.replace_strategy == "staging-optimized": + return [SynapseStagingCopyJob.from_table_chain(table_chain, self.sql_client)] + return super()._create_replace_followup_jobs(table_chain) + + def _from_db_type(self, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType: + return self.type_mapper.from_db_type(pq_t, precision, scale) diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index ec3f2bb47b..eb8a08e3d1 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -384,7 +384,7 @@ def decorator(f: Callable[TResourceFunParams, Any]) -> Callable[TResourceFunPara spec=spec, sections=resource_sections, sections_merge_style=ConfigSectionContext.resource_merge_style, include_defaults=spec is not None ) is_inner_resource = is_inner_callable(f) - if conf_f != incr_f and is_inner_resource: + if conf_f != incr_f and is_inner_resource and not standalone: raise ResourceInnerCallableConfigWrapDisallowed(resource_name, source_section) # get spec for wrapped function SPEC = get_fun_spec(conf_f) @@ -494,7 +494,7 @@ def transformer( selected: bool = True, spec: Type[BaseConfiguration] = None, standalone: Literal[True] = True -) -> Callable[..., DltResource]: # TODO: change back to Callable[TResourceFunParams, DltResource] when mypy 1.6 is fixed +) -> Callable[TResourceFunParams, DltResource]: # TODO: change back to Callable[TResourceFunParams, DltResource] when mypy 1.6 is fixed ... def transformer( diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 7f8142e807..fdca26fcfe 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -164,14 +164,14 @@ def write_empty_file(self, table_name: str) -> None: table_name = self.schema.naming.normalize_table_identifier(table_name) self.storage.write_empty_file(self.extract_id, self.schema.name, table_name, None) - def _write_item(self, table_name: str, resource_name: str, items: TDataItems) -> None: + def _write_item(self, table_name: str, resource_name: str, items: TDataItems, columns: TTableSchemaColumns = None) -> None: # normalize table name before writing so the name match the name in schema # note: normalize function should be cached so there's almost no penalty on frequent calling # note: column schema is not required for jsonl writer used here table_name = self.schema.naming.normalize_identifier(table_name) self.collector.update(table_name) self.resources_with_items.add(resource_name) - self.storage.write_data_item(self.extract_id, self.schema.name, table_name, items, None) + self.storage.write_data_item(self.extract_id, self.schema.name, table_name, items, columns) def _write_dynamic_table(self, resource: DltResource, item: TDataItem) -> None: table_name = resource._table_name_hint_fun(item) @@ -212,6 +212,9 @@ def write_table(self, resource: DltResource, items: TDataItems, meta: Any) -> No ] super().write_table(resource, items, meta) + def _write_item(self, table_name: str, resource_name: str, items: TDataItems, columns: TTableSchemaColumns = None) -> None: + super()._write_item(table_name, resource_name, items, self.dynamic_tables[table_name][0]["columns"]) + def _write_static_table(self, resource: DltResource, table_name: str, items: TDataItems) -> None: existing_table = self.dynamic_tables.get(table_name) if existing_table is not None: diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index c77fb97b9f..b1dffe6b28 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -1,5 +1,5 @@ -from datetime import datetime # noqa: I251 -from typing import Optional, Tuple, Protocol, Mapping, Union, List +from datetime import datetime, date # noqa: I251 +from typing import Optional, Tuple, List try: import pandas as pd @@ -137,8 +137,9 @@ def __call__( return row, start_out_of_range, end_out_of_range - class ArrowIncremental(IncrementalTransformer): + _dlt_index = "_dlt_index" + def unique_values( self, item: "TAnyArrowItem", @@ -148,28 +149,34 @@ def unique_values( if not unique_columns: return [] item = item - indices = item["_dlt_index"].to_pylist() + indices = item[self._dlt_index].to_pylist() rows = item.select(unique_columns).to_pylist() return [ (index, digest128(json.dumps(row, sort_keys=True))) for index, row in zip(indices, rows) ] - def _deduplicate(self, tbl: "pa.Table", unique_columns: Optional[List[str]], aggregate: str, cursor_path: str) -> "pa.Table": - if unique_columns is None: - return tbl - group_cols = unique_columns + [cursor_path] - tbl = tbl.append_column("_dlt_index", pa.array(np.arange(tbl.num_rows))) - try: - tbl = tbl.filter( - pa.compute.is_in( - tbl['_dlt_index'], - tbl.group_by(group_cols).aggregate( - [("_dlt_index", "one"), (cursor_path, aggregate)] - )['_dlt_index_one'] - ) - ) - except KeyError as e: - raise IncrementalPrimaryKeyMissing(self.resource_name, unique_columns[0], tbl) from e + def _deduplicate(self, tbl: "pa.Table", unique_columns: Optional[List[str]], aggregate: str, cursor_path: str) -> "pa.Table": + """Creates unique index if necessary.""" + # create unique index if necessary + if self._dlt_index not in tbl.schema.names: + tbl = tbl.append_column(self._dlt_index, pa.array(np.arange(tbl.num_rows))) + # code below deduplicates groups that include the cursor column in the group id. that was just artifact of + # json incremental and there's no need to duplicate it here + + # if unique_columns is None: + # return tbl + # group_cols = unique_columns + [cursor_path] + # try: + # tbl = tbl.filter( + # pa.compute.is_in( + # tbl[self._dlt_index], + # tbl.group_by(group_cols).aggregate( + # [(self._dlt_index, "one"), (cursor_path, aggregate)] + # )[f'{self._dlt_index}_one'] + # ) + # ) + # except KeyError as e: + # raise IncrementalPrimaryKeyMissing(self.resource_name, unique_columns[0], tbl) from e return tbl def __call__( @@ -180,6 +187,25 @@ def __call__( if is_pandas: tbl = pa.Table.from_pandas(tbl) + primary_key = self.primary_key(tbl) if callable(self.primary_key) else self.primary_key + if primary_key: + # create a list of unique columns + if isinstance(primary_key, str): + unique_columns = [primary_key] + else: + unique_columns = list(primary_key) + # check if primary key components are in the table + for pk in unique_columns: + if pk not in tbl.schema.names: + raise IncrementalPrimaryKeyMissing(self.resource_name, pk, tbl) + # use primary key as unique index + if isinstance(primary_key, str): + self._dlt_index = primary_key + elif primary_key is None: + unique_columns = tbl.column_names + else: # deduplicating is disabled + unique_columns = None + start_out_of_range = end_out_of_range = False if not tbl: # row is None or empty arrow table return tbl, start_out_of_range, end_out_of_range @@ -206,24 +232,19 @@ def __call__( cursor_path = str(self.cursor_path) # The new max/min value try: - row_value = compute(tbl[cursor_path]).as_py() + orig_row_value = compute(tbl[cursor_path]) + row_value = orig_row_value.as_py() + # dates are not represented as datetimes but I see connector-x represents + # datetimes as dates and keeping the exact time inside. probably a bug + # but can be corrected this way + if isinstance(row_value, date) and not isinstance(row_value, datetime): + row_value = pendulum.from_timestamp(orig_row_value.cast(pa.int64()).as_py() / 1000) except KeyError as e: raise IncrementalCursorPathMissing( self.resource_name, cursor_path, tbl, f"Column name {str(cursor_path)} was not found in the arrow table. Note nested JSON paths are not supported for arrow tables and dataframes, the incremental cursor_path must be a column name." ) from e - primary_key = self.primary_key(tbl) if callable(self.primary_key) else self.primary_key - if primary_key: - if isinstance(primary_key, str): - unique_columns = [primary_key] - else: - unique_columns = list(primary_key) - elif primary_key is None: - unique_columns = tbl.column_names - else: # deduplicating is disabled - unique_columns = None - # If end_value is provided, filter to include table rows that are "less" than end_value if self.end_value is not None: tbl = tbl.filter(end_compare(tbl[cursor_path], self.end_value)) @@ -247,7 +268,7 @@ def __call__( unique_values = [(i, uq_val) for i, uq_val in unique_values if uq_val in self.incremental_state['unique_hashes']] remove_idx = pa.array(i for i, _ in unique_values) # Filter the table - tbl = tbl.filter(pa.compute.invert(pa.compute.is_in(tbl["_dlt_index"], remove_idx))) + tbl = tbl.filter(pa.compute.invert(pa.compute.is_in(tbl[self._dlt_index], remove_idx))) if new_value_compare(row_value, last_value).as_py() and row_value != last_value: # Last value has changed self.incremental_state['last_value'] = row_value diff --git a/dlt/helpers/dbt/__init__.py b/dlt/helpers/dbt/__init__.py index c6107b2873..3acdc84c7b 100644 --- a/dlt/helpers/dbt/__init__.py +++ b/dlt/helpers/dbt/__init__.py @@ -18,6 +18,7 @@ "athena": "athena-community", "motherduck": "duckdb", "mssql": "sqlserver", + "synapse": "sqlserver", } diff --git a/docs/website/blog/2023-05-26-structured-data-lakes-through-schema-evolution-next-generation-data-platform.md b/docs/website/blog/2023-05-26-structured-data-lakes-through-schema-evolution-next-generation-data-platform.md index d47b9a5b72..ef6b028ba6 100644 --- a/docs/website/blog/2023-05-26-structured-data-lakes-through-schema-evolution-next-generation-data-platform.md +++ b/docs/website/blog/2023-05-26-structured-data-lakes-through-schema-evolution-next-generation-data-platform.md @@ -107,6 +107,6 @@ To try out schema evolution with `dlt`, check out our [colab demo.](https://cola ### Want more? -- Join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) +- Join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) - Read our [schema evolution blog post](https://dlthub.com/docs/blog/schema-evolution) - Stay tuned for the next article in the series: *How to do schema evolution with* `dlt` *in the most effective way* \ No newline at end of file diff --git a/docs/website/blog/2023-06-21-open-api-spec-for-dlt-init.md b/docs/website/blog/2023-06-21-open-api-spec-for-dlt-init.md index 3e87615324..831643f36e 100644 --- a/docs/website/blog/2023-06-21-open-api-spec-for-dlt-init.md +++ b/docs/website/blog/2023-06-21-open-api-spec-for-dlt-init.md @@ -45,4 +45,4 @@ Anyway: [Try it out yourself!](https://github.com/dlt-hub/dlt-init-openapi) - **More heuristics:** Many more heuristics to extract resources, their dependencies, infer the incremental and merge loading. - **Tight integration with FastAPI on the code level to get even more heuristics!** -[Your feedback and help is greatly appreciated.](https://github.com/dlt-hub/dlt/blob/devel/CONTRIBUTING.md) [Join our community, and let’s build together.](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) \ No newline at end of file +[Your feedback and help is greatly appreciated.](https://github.com/dlt-hub/dlt/blob/devel/CONTRIBUTING.md) [Join our community, and let’s build together.](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) \ No newline at end of file diff --git a/docs/website/blog/2023-08-21-dlt-lineage-support.md b/docs/website/blog/2023-08-21-dlt-lineage-support.md index 3fc3c47cef..494b6b7693 100644 --- a/docs/website/blog/2023-08-21-dlt-lineage-support.md +++ b/docs/website/blog/2023-08-21-dlt-lineage-support.md @@ -124,4 +124,4 @@ In summary, the integration of lineage through `dlt` empowers organizations to c ## Start using dlt today What are you waiting for? * Dive into our [getting started docs](https://dlthub.com/docs/getting-started) -* [Join the slack community for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) \ No newline at end of file +* [Join the slack community for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) \ No newline at end of file diff --git a/docs/website/blog/2023-08-24-dlt-etlt.md b/docs/website/blog/2023-08-24-dlt-etlt.md index 72c640ceaf..4da8d03021 100644 --- a/docs/website/blog/2023-08-24-dlt-etlt.md +++ b/docs/website/blog/2023-08-24-dlt-etlt.md @@ -217,4 +217,4 @@ Fundamentally, we all agree it's all ETL, with the flavors simply designating sp ## Start using `dlt` today What are you waiting for? * Dive into our [getting started docs](https://dlthub.com/docs/getting-started) -* [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) +* [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) diff --git a/docs/website/blog/2023-09-05-mongo-etl.md b/docs/website/blog/2023-09-05-mongo-etl.md index 5fe0f0fd41..8ce22a5d2c 100644 --- a/docs/website/blog/2023-09-05-mongo-etl.md +++ b/docs/website/blog/2023-09-05-mongo-etl.md @@ -199,4 +199,4 @@ Read more about it here: What are you waiting for? - Dive into our [Getting Started.](https://dlthub.com/docs/getting-started) -- [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) +- [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) diff --git a/docs/website/blog/2023-09-26-verba-dlt-zendesk.md b/docs/website/blog/2023-09-26-verba-dlt-zendesk.md index b075dd9163..bf22d2c9ec 100644 --- a/docs/website/blog/2023-09-26-verba-dlt-zendesk.md +++ b/docs/website/blog/2023-09-26-verba-dlt-zendesk.md @@ -293,4 +293,4 @@ In this blog post, we've built a RAG application for Zendesk Support using Verba ## Let's stay in touch -If you have any questions or feedback, please reach out to us on the [dltHub Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g). +If you have any questions or feedback, please reach out to us on the [dltHub Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA). diff --git a/docs/website/blog/2023-10-06-dlt-holistics.md b/docs/website/blog/2023-10-06-dlt-holistics.md index 71bcd9b0d4..1480fcc8a0 100644 --- a/docs/website/blog/2023-10-06-dlt-holistics.md +++ b/docs/website/blog/2023-10-06-dlt-holistics.md @@ -463,7 +463,7 @@ This modern data stack offers an efficient and effective way to bridge the gap b ## Additional Resources: -- Want to discuss `dlt`? Join the `dlt` [Slack Community](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) +- Want to discuss `dlt`? Join the `dlt` [Slack Community](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) - Check out our friends over at [Holistics](https://www.holistics.io/). - [`dlt` MongoDB Source](https://dlthub.com/docs/dlt-ecosystem/verified-sources/mongodb). - Holistics 4.0: [Analytics as Code](https://docs.holistics.io/as-code/get-started). diff --git a/docs/website/blog/2023-10-10-data-product-docs.md b/docs/website/blog/2023-10-10-data-product-docs.md index f69ffad986..d4f646b282 100644 --- a/docs/website/blog/2023-10-10-data-product-docs.md +++ b/docs/website/blog/2023-10-10-data-product-docs.md @@ -109,4 +109,4 @@ Stop thinking about data, code and docs in isolation - they do not function inde Want to create data products with dlt? What are you waiting for? - Dive into our [Getting Started.](https://dlthub.com/docs/getting-started) -- [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) +- [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) diff --git a/docs/website/blog/2023-10-16-first-data-warehouse.md b/docs/website/blog/2023-10-16-first-data-warehouse.md index 12a97919c5..a9c10692f0 100644 --- a/docs/website/blog/2023-10-16-first-data-warehouse.md +++ b/docs/website/blog/2023-10-16-first-data-warehouse.md @@ -168,4 +168,4 @@ If you're building on Google Cloud Platform (GCP), here are some tutorials and r Want to discuss dlt and data lakes or warehouses? - Dive into our [Getting Started.](https://dlthub.com/docs/getting-started) -- [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) +- [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) diff --git a/docs/website/blog/2023-10-19-dbt-runners.md b/docs/website/blog/2023-10-19-dbt-runners.md index 2252752a1d..8237cc7ac5 100644 --- a/docs/website/blog/2023-10-19-dbt-runners.md +++ b/docs/website/blog/2023-10-19-dbt-runners.md @@ -238,6 +238,6 @@ Having the 2 options to run Cloud or Core versions of dbt enables better integra Want more? -- [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g) +- [Join the ⭐Slack Community⭐ for discussion and help!](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA) - Dive into our [Getting Started.](https://dlthub.com/docs/getting-started) - Star us on [GitHub](https://github.com/dlt-hub/dlt)! diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md b/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md index 5840fb3049..85796b0333 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/arrow-pandas.md @@ -85,6 +85,10 @@ pipeline.run(orders) pipeline.run(orders) ``` +:::tip +Look at the [Connector X + Arrow Example](../../examples/connector_x_arrow/) to see how to load data from production databases fast. +::: + ## Supported Arrow data types The Arrow data types are translated to dlt data types as follows: diff --git a/docs/website/docs/general-usage/destination-tables.md b/docs/website/docs/general-usage/destination-tables.md index 8f95639f87..8e1f771e47 100644 --- a/docs/website/docs/general-usage/destination-tables.md +++ b/docs/website/docs/general-usage/destination-tables.md @@ -163,6 +163,11 @@ case the primary key or other unique columns are defined. During a pipeline run, dlt [normalizes both table and column names](schema.md#naming-convention) to ensure compatibility with the destination database's accepted format. All names from your source data will be transformed into snake_case and will only include alphanumeric characters. Please be aware that the names in the destination database may differ somewhat from those in your original input. +### Variant columns +If your data has inconsistent types, `dlt` will dispatch the data to several **variant columns**. For example, if you have a resource (ie json file) with a filed with name **answer** and your data contains boolean values, you will get get a column with name **answer** of type **BOOLEAN** in your destination. If for some reason, on next load you get integer value and string value in **answer**, the inconsistent data will go to **answer__v_bigint** and **answer__v_text** columns respectively. +The general naming rule for variant columns is `__v_` where `original_name` is the existing column name (with data type clash) and `type` is the name of data type stored in the variant. + + ## Load Packages and Load IDs Each execution of the pipeline generates one or more load packages. A load package typically contains data retrieved from diff --git a/docs/website/docs/general-usage/schema.md b/docs/website/docs/general-usage/schema.md index e27a87e803..ee73aea54e 100644 --- a/docs/website/docs/general-usage/schema.md +++ b/docs/website/docs/general-usage/schema.md @@ -243,12 +243,12 @@ data itself. The `dlt.source` decorator accepts a schema instance that you can create yourself and modify in whatever way you wish. The decorator also support a few typical use cases: -### 1. Schema created implicitly by decorator +### Schema created implicitly by decorator If no schema instance is passed, the decorator creates a schema with the name set to source name and all the settings to default. -### 2. Automatically load schema file stored with source python module +### Automatically load schema file stored with source python module If no schema instance is passed, and a file with a name `{source name}_schema.yml` exists in the same folder as the module with the decorated function, it will be automatically loaded and used as @@ -256,7 +256,7 @@ the schema. This should make easier to bundle a fully specified (or pre-configured) schema with a source. -### 3. Schema is modified in the source function body +### Schema is modified in the source function body What if you can configure your schema or add some tables only inside your schema function, when i.e. you have the source credentials and user settings available? You could for example add detailed @@ -264,7 +264,7 @@ schemas of all the database tables when someone requests a table data to be load is available only at the moment source function is called. Similarly to the `source_state()` and `resource_state()` , source and resource function has current -schema available via `dlt.current.source_schema`. +schema available via `dlt.current.source_schema()`. Example: @@ -273,9 +273,10 @@ Example: def textual(nesting_level: int): # get the source schema from the `current` context schema = dlt.current.source_schema() - # remove date detector and add type detector that forces all fields to strings - schema._settings["detections"].remove("iso_timestamp") - schema._settings["detections"].insert(0, "all_text") + # remove date detector + schema.remove_type_detection("iso_timestamp") + # convert UNIX timestamp (float, withing a year from NOW) into timestamp + schema.add_type_detection("timestamp") schema.compile_settings() return dlt.resource(...) diff --git a/pyproject.toml b/pyproject.toml index adf6ff9b3f..d8787ecce3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,6 +98,7 @@ athena = ["pyathena", "pyarrow", "s3fs", "botocore"] weaviate = ["weaviate-client"] pydantic = ["pydantic"] mssql = ["pyodbc"] +synapse = ["pyodbc"] [tool.poetry.scripts] dlt = "dlt.cli._dlt:_main" diff --git a/tests/.dlt/config.toml b/tests/.dlt/config.toml index 1eac35d306..3d13d55478 100644 --- a/tests/.dlt/config.toml +++ b/tests/.dlt/config.toml @@ -1,5 +1,6 @@ [runtime] -sentry_dsn="https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752" +# TODO add back sentry +sentry_dsn="" [tests] bucket_url_gs="gs://ci-test-bucket" diff --git a/tests/common/schema/test_inference.py b/tests/common/schema/test_inference.py index be58adab53..24c97219fc 100644 --- a/tests/common/schema/test_inference.py +++ b/tests/common/schema/test_inference.py @@ -372,7 +372,8 @@ def test_corece_null_value_over_not_null(schema: Schema) -> None: def test_infer_with_autodetection(schema: Schema) -> None: - c = schema._infer_column("ts", pendulum.now().timestamp()) + # iso timestamp detection + c = schema._infer_column("ts", pendulum.now().isoformat()) assert c["data_type"] == "timestamp" schema._type_detections = [] c = schema._infer_column("ts", pendulum.now().timestamp()) diff --git a/tests/common/test_data_writers/test_buffered_writer.py b/tests/common/test_data_writers/test_buffered_writer.py index 59a99a4ca6..7ee72ff8e3 100644 --- a/tests/common/test_data_writers/test_buffered_writer.py +++ b/tests/common/test_data_writers/test_buffered_writer.py @@ -2,7 +2,6 @@ from typing import Iterator import pytest -from dlt.common.arithmetics import Decimal from dlt.common.data_writers.buffered import BufferedDataWriter, DataWriter from dlt.common.data_writers.exceptions import BufferedDataWriterClosed @@ -16,7 +15,10 @@ import datetime # noqa: 251 -def get_insert_writer(_format: TLoaderFileFormat = "insert_values", buffer_max_items: int = 10, disable_compression: bool = False) -> BufferedDataWriter[DataWriter]: +ALL_WRITERS: Set[Literal[TLoaderFileFormat]] = {"insert_values", "jsonl", "parquet", "arrow", "puae-jsonl"} + + +def get_writer(_format: TLoaderFileFormat = "insert_values", buffer_max_items: int = 10, disable_compression: bool = False) -> BufferedDataWriter[DataWriter]: caps = DestinationCapabilitiesContext.generic_capabilities() caps.preferred_loader_file_format = _format file_template = os.path.join(TEST_STORAGE_ROOT, f"{_format}.%s") @@ -24,7 +26,7 @@ def get_insert_writer(_format: TLoaderFileFormat = "insert_values", buffer_max_i def test_write_no_item() -> None: - with get_insert_writer() as writer: + with get_writer() as writer: pass assert writer.closed with pytest.raises(BufferedDataWriterClosed): diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 5f1ab9279f..2b6d26ba12 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -351,7 +351,7 @@ def test_composite_primary_key(item_type: TItemFormat) -> None: {'created_at': 2, 'isrc': 'AAA', 'market': 'DE'}, {'created_at': 2, 'isrc': 'CCC', 'market': 'DE'}, {'created_at': 2, 'isrc': 'DDD', 'market': 'DE'}, - {'created_at': 2, 'isrc': 'CCC', 'market': 'DE'}, + {'created_at': 1, 'isrc': 'CCC', 'market': 'DE'}, ] source_items = data_to_item_format(item_type, data) @@ -366,8 +366,8 @@ def some_data(created_at=dlt.sources.incremental('created_at')): with c.execute_query("SELECT created_at, isrc, market FROM some_data order by created_at, isrc, market") as cur: rows = cur.fetchall() - expected = [(1, 'AAA', 'DE'), (2, 'AAA', 'DE'), (2, 'BBB', 'DE'), (2, 'CCC', 'DE'), (2, 'CCC', 'US'), (2, 'DDD', 'DE')] - assert rows == expected + expected = {(1, 'AAA', 'DE'), (2, 'AAA', 'DE'), (2, 'BBB', 'DE'), (2, 'CCC', 'DE'), (2, 'CCC', 'US'), (2, 'DDD', 'DE'), (1, 'CCC', 'DE')} + assert set(rows) == expected @pytest.mark.parametrize("item_type", ALL_ITEM_FORMATS) @@ -520,9 +520,6 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): assert py_ex.value.json_path == "item.timestamp" - - - @pytest.mark.parametrize("item_type", ALL_ITEM_FORMATS) def test_filter_processed_items(item_type: TItemFormat) -> None: """Checks if already processed items are filtered out""" diff --git a/tests/extract/utils.py b/tests/extract/utils.py index 2465a1b1e2..b109cdbdd9 100644 --- a/tests/extract/utils.py +++ b/tests/extract/utils.py @@ -1,12 +1,11 @@ -from typing import Any, Optional, List, Union, Literal, get_args +from typing import Any, Optional, List, Literal, get_args import pytest -from itertools import zip_longest, chain +from itertools import zip_longest -from dlt.common.typing import TDataItem, TDataItems, TAny +from dlt.common.typing import TDataItem, TDataItems from dlt.extract.extract import ExtractorStorage -from dlt.extract.typing import ItemTransform, ItemTransformFunc -from tests.cases import TArrowFormat +from dlt.extract.typing import ItemTransform import pandas as pd from dlt.common.libs.pyarrow import pyarrow as pa diff --git a/tests/load/filesystem/test_aws_credentials.py b/tests/load/filesystem/test_aws_credentials.py index bf9e0bd681..b4f367b4e1 100644 --- a/tests/load/filesystem/test_aws_credentials.py +++ b/tests/load/filesystem/test_aws_credentials.py @@ -52,6 +52,18 @@ def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None: assert c.is_resolved() assert not c.is_partial() + s3_cred = c.to_s3fs_credentials() + assert s3_cred == { + "key": "fake_access_key", + "secret": "fake_secret_key", + "token": "fake_session_token", + "profile": None, + "endpoint_url": None, + "client_kwargs": { + "region_name": session.get_config_variable('region') + } + } + c = AwsCredentials() c.parse_native_representation(botocore.session.get_session()) assert c.is_resolved() @@ -125,3 +137,4 @@ def set_aws_credentials_env(environment: Dict[str, str]) -> None: environment['AWS_ACCESS_KEY_ID'] = 'fake_access_key' environment['AWS_SECRET_ACCESS_KEY'] = 'fake_secret_key' environment['AWS_SESSION_TOKEN'] = 'fake_session_token' + environment['REGION_NAME'] = 'eu-central-1' diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 99071a7ac6..b23ad64656 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -29,6 +29,7 @@ @pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, all_buckets_filesystem_configs=True), ids=lambda x: x.name) @pytest.mark.parametrize('use_single_dataset', [True, False]) def test_default_pipeline_names(use_single_dataset: bool, destination_config: DestinationTestConfiguration) -> None: + print(f"\n\nTesting with use_single_dataset: {use_single_dataset}, destination_config: {destination_config}") destination_config.setup() p = dlt.pipeline() p.config.use_single_dataset = use_single_dataset @@ -41,6 +42,8 @@ def test_default_pipeline_names(use_single_dataset: bool, destination_config: De assert p.destination is None assert p.default_schema_name is None + print(f"\nAfter setup, pipeline_name: {p.pipeline_name}, dataset_name: {p.dataset_name}, default_schema_name: {p.default_schema_name}") + data = ["a", "b", "c"] with pytest.raises(PipelineStepFailed) as step_ex: p.extract(data) @@ -50,13 +53,20 @@ def test_default_pipeline_names(use_single_dataset: bool, destination_config: De def data_fun() -> Iterator[Any]: yield data + print(f"\nData function defined: {data_fun}") + + # this will create default schema p.extract(data_fun) + print(f"\nAfter first extract, default_schema_name: {p.default_schema_name}, schemas: {p.schemas}") + # _pipeline suffix removed when creating default schema name assert p.default_schema_name in ["dlt_pytest", "dlt"] # this will create additional schema p.extract(data_fun(), schema=dlt.Schema("names")) + print(f"\nAfter second extract, default_schema_name: {p.default_schema_name}, schemas: {p.schemas}") + assert p.default_schema_name in ["dlt_pytest", "dlt"] assert "names" in p.schemas.keys() @@ -78,18 +88,24 @@ def data_fun() -> Iterator[Any]: assert p.dataset_name in possible_dataset_names p.normalize() info = p.load(dataset_name="d" + uniq_id()) + print(f"\nAfter load, dataset_name: {p.dataset_name}, info: {info}") + print(p.dataset_name) assert info.pipeline is p # two packages in two different schemas were loaded assert len(info.loads_ids) == 2 + print(f"Before assert_table, use_single_dataset: {use_single_dataset}") # if loaded to single data, double the data was loaded to a single table because the schemas overlapped if use_single_dataset: assert_table(p, "data_fun", sorted(data * 2), info=info) else: + print(f"\nNot single data set...here's the data: {data}") + print(f"here's the info: {info}") # loaded to separate data sets assert_table(p, "data_fun", data, info=info) assert_table(p, "data_fun", data, schema_name="names", info=info) + print(f"\nEnd of test\n\n") @pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, all_buckets_filesystem_configs=True), ids=lambda x: x.name) diff --git a/tests/load/pipeline/utils.py b/tests/load/pipeline/utils.py index 34c0b3a210..96f25b6256 100644 --- a/tests/load/pipeline/utils.py +++ b/tests/load/pipeline/utils.py @@ -69,6 +69,9 @@ def _is_filesystem(p: dlt.Pipeline) -> bool: def assert_table(p: dlt.Pipeline, table_name: str, table_data: List[Any], schema_name: str = None, info: LoadInfo = None) -> None: + print("WITHIN ASSERT_TABLE, here are params going to assert_table_sql table data: " + str(table_data)) + print("WITHIN ASSERT_TABLE, pipeline: " + str(p)) + print("WITHIN ASSERT_TABLE, table_name: " + str(table_name)) func = _assert_table_fs if _is_filesystem(p) else _assert_table_sql func(p, table_name, table_data, schema_name, info) @@ -76,8 +79,14 @@ def assert_table(p: dlt.Pipeline, table_name: str, table_data: List[Any], schema def _assert_table_sql(p: dlt.Pipeline, table_name: str, table_data: List[Any], schema_name: str = None, info: LoadInfo = None) -> None: with p.sql_client(schema_name=schema_name) as c: table_name = c.make_qualified_table_name(table_name) + print(f"Qualified table name: {table_name}") # Print the qualified table name + query = f"SELECT * FROM {table_name} ORDER BY 1" # Constructing the query + print(f"Executing query: {query}") # Print the query being executed + print(f"Expected table data: {table_data}") # Print the expected table data + print(f"Schema name: {schema_name}") # Print the schema name # Implement NULLS FIRST sort in python - assert_query_data(p, f"SELECT * FROM {table_name} ORDER BY 1", table_data, schema_name, info, sort_key=lambda row: row[0] is not None) + assert_query_data(p, query, table_data, schema_name, info, sort_key=lambda row: row[0] is not None) + def _assert_table_fs(p: dlt.Pipeline, table_name: str, table_data: List[Any], schema_name: str = None, info: LoadInfo = None) -> None: @@ -108,6 +117,14 @@ def assert_query_data(p: dlt.Pipeline, sql: str, table_data: List[Any], schema_n """ rows = select_data(p, sql, schema_name) + print("HERE IS WHERE ROWS GETS CALCED") + print("P: " + str(p)) + print("sql: " + str(sql)) + print("schema_name: " + str(schema_name)) + + print("HERE ARE THE ROWS FOR ASSERTION: " + str(rows)) + print("HERE IS THE TABLE DATA FOR ASSERTION: " + str(table_data)) + assert len(rows) == len(table_data) if sort_key is not None: rows = sorted(rows, key=sort_key) diff --git a/tests/load/test_sql_client.py b/tests/load/test_sql_client.py index 5c6e1b9e31..628d0f818e 100644 --- a/tests/load/test_sql_client.py +++ b/tests/load/test_sql_client.py @@ -183,6 +183,9 @@ def test_execute_df(client: SqlJobClientBase) -> None: elif client.config.destination_name == "mssql": chunk_size = 700 total_records = 1000 + elif client.config.destination_name == "synapse": + chunk_size = 700 + total_records = 1000 else: chunk_size = 2048 total_records = 3000 @@ -190,9 +193,25 @@ def test_execute_df(client: SqlJobClientBase) -> None: client.update_stored_schema() table_name = prepare_temp_table(client) f_q_table_name = client.sql_client.make_qualified_table_name(table_name) - insert_query = ",".join([f"({idx})" for idx in range(0, total_records)]) - client.sql_client.execute_sql(f"INSERT INTO {f_q_table_name} VALUES {insert_query};") + # TODO remove synapse handling + def generate_synapse_insert_query(total_records, is_synapse): + if is_synapse: + return " UNION ALL ".join([f"SELECT {idx}" for idx in range(0, total_records)]) + else: + return ",".join([f"({idx})" for idx in range(0, total_records)]) + + is_synapse = client.config.destination_name == "synapse" + insert_query = generate_synapse_insert_query(total_records, is_synapse) + + if is_synapse: + sql_statement = f"INSERT INTO {f_q_table_name} (col) {insert_query};" + else: + sql_statement = f"INSERT INTO {f_q_table_name} VALUES {insert_query};" + + #print(f"Executing SQL: {sql_statement}") + client.sql_client.execute_sql(sql_statement) + with client.sql_client.execute_query(f"SELECT * FROM {f_q_table_name} ORDER BY col ASC") as curr: df = curr.df() # Force lower case df columns, snowflake has all cols uppercase diff --git a/tests/load/utils.py b/tests/load/utils.py index be2097c879..a17a654770 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -147,6 +147,7 @@ def destinations_configs( DestinationTestConfiguration(destination="snowflake", staging="filesystem", file_format="jsonl", bucket_url=AWS_BUCKET, stage_name="PUBLIC.dlt_s3_stage", extra_info="s3-integration"), DestinationTestConfiguration(destination="snowflake", staging="filesystem", file_format="jsonl", bucket_url=AZ_BUCKET, stage_name="PUBLIC.dlt_az_stage", extra_info="az-integration"), DestinationTestConfiguration(destination="snowflake", staging="filesystem", file_format="jsonl", bucket_url=AZ_BUCKET, extra_info="az-authorization"), + DestinationTestConfiguration(destination="synapse", staging="filesystem", file_format="parquet", bucket_url=AZ_BUCKET, stage_name="PUBLIC.dlt_az_stage") ] if all_staging_configs: @@ -155,6 +156,8 @@ def destinations_configs( DestinationTestConfiguration(destination="snowflake", staging="filesystem", file_format="parquet", bucket_url=AWS_BUCKET, extra_info="credential-forwarding"), DestinationTestConfiguration(destination="redshift", staging="filesystem", file_format="jsonl", bucket_url=AWS_BUCKET, extra_info="credential-forwarding"), DestinationTestConfiguration(destination="bigquery", staging="filesystem", file_format="jsonl", bucket_url=GCS_BUCKET, extra_info="gcs-authorization"), + DestinationTestConfiguration(destination="synapse", staging="filesystem", file_format="parquet", bucket_url=AZ_BUCKET, extra_info="az-integration") + ] # add local filesystem destinations if requested diff --git a/tests/pipeline/test_arrow_sources.py b/tests/pipeline/test_arrow_sources.py index 759d5735c9..016eb1ee48 100644 --- a/tests/pipeline/test_arrow_sources.py +++ b/tests/pipeline/test_arrow_sources.py @@ -1,17 +1,13 @@ +import os import pytest import pandas as pd -from typing import Any, Union import pyarrow as pa import dlt -from dlt.common import Decimal from dlt.common.utils import uniq_id -from dlt.common.exceptions import TerminalValueError from dlt.pipeline.exceptions import PipelineStepFailed from tests.cases import arrow_table_all_data_types, TArrowFormat -from dlt.common.storages import LoadStorage - @pytest.mark.parametrize( @@ -111,3 +107,35 @@ def map_func(item): assert len(result) == 1 assert result[0]['int'][0].as_py() == 1 + + +@pytest.mark.parametrize("item_type", ["pandas", "table", "record_batch"]) +def test_extract_normalize_file_rotation(item_type: TArrowFormat) -> None: + # do not extract state + os.environ["RESTORE_FROM_DESTINATION"] = "False" + # use parquet for dummy + os.environ["DESTINATION__LOADER_FILE_FORMAT"] = "parquet" + + pipeline_name = "arrow_" + uniq_id() + pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + + item, rows = arrow_table_all_data_types(item_type) + + @dlt.resource + def data_frames(): + for _ in range(10): + yield item + + # get buffer written and file rotated with each yielded frame + os.environ[f"SOURCES__{pipeline_name.upper()}__DATA_WRITER__BUFFER_MAX_ITEMS"] = str(len(rows)) + os.environ[f"SOURCES__{pipeline_name.upper()}__DATA_WRITER__FILE_MAX_ITEMS"] = str(len(rows)) + + pipeline.extract(data_frames()) + # ten parquet files + assert len(pipeline.list_extracted_resources()) == 10 + info = pipeline.normalize(workers=3) + # with 10 * num rows + assert info.row_counts["data_frames"] == 10 * len(rows) + load_id = pipeline.list_normalized_load_packages()[0] + # 10 jobs on parquet files + assert len(pipeline.get_load_package_info(load_id).jobs["new_jobs"]) == 10 diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 48368a9df0..917dac75c4 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -9,7 +9,7 @@ import pytest import dlt -from dlt.common import json, sleep +from dlt.common import json, sleep, pendulum from dlt.common.configuration.container import Container from dlt.common.configuration.specs.aws_credentials import AwsCredentials from dlt.common.configuration.specs.exceptions import NativeValueError @@ -19,7 +19,7 @@ from dlt.common.exceptions import DestinationHasFailedJobs, DestinationTerminalException, PipelineStateNotAvailable, UnknownDestinationModule from dlt.common.pipeline import PipelineContext from dlt.common.runtime.collector import AliveCollector, EnlightenCollector, LogCollector, TqdmCollector -from dlt.common.schema.utils import new_column +from dlt.common.schema.utils import new_column, new_table from dlt.common.utils import uniq_id from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted @@ -1142,3 +1142,63 @@ def generic(start=8): pipeline = dlt.pipeline(destination='duckdb') pipeline.run(generic(), loader_file_format=file_format) + + +def test_remove_autodetect() -> None: + now = pendulum.now() + + @dlt.source + def autodetect(): + # add unix ts autodetection to current source schema + dlt.current.source_schema().add_type_detection("timestamp") + return dlt.resource([int(now.timestamp()), int(now.timestamp() + 1), int(now.timestamp() + 2)], name="numbers") + + pipeline = dlt.pipeline(destination='duckdb') + pipeline.run(autodetect()) + + # unix ts recognized + assert pipeline.default_schema.get_table("numbers")["columns"]["value"]["data_type"] == "timestamp" + + pipeline = pipeline.drop() + + source = autodetect() + source.schema.remove_type_detection("timestamp") + + pipeline = dlt.pipeline(destination='duckdb') + pipeline.run(source) + + assert pipeline.default_schema.get_table("numbers")["columns"]["value"]["data_type"] == "bigint" + + +def test_flattened_column_hint() -> None: + now = pendulum.now() + + # @dlt.resource(columns=[{"name": "value__timestamp", "data_type": "timestamp"}]) + @dlt.resource() + def flattened_dict(): + # dlt.current.source_schema().add_type_detection("timestamp") + + for delta in range(4): + yield {"delta": delta, "values": [{"Value": {"timestampValue": now.timestamp() + delta}}]} + + @dlt.source + def nested_resource(): + # we need to create a whole structure + dict_resource = flattened_dict() + # add table from resource + dlt.current.source_schema().update_table(dict_resource.compute_table_schema()) + values_table = new_table( + dict_resource.name + "__values", + parent_table_name=dict_resource.name, + columns=[{"name": "value__timestamp_value", "data_type": "timestamp"}] + ) + # and child table + dlt.current.source_schema().update_table(values_table) + return dict_resource + + pipeline = dlt.pipeline(destination='duckdb') + pipeline.run(nested_resource()) + # print(pipeline.default_schema.to_pretty_yaml()) + assert pipeline.default_schema.get_table("flattened_dict__values")["columns"]["value__timestamp_value"]["data_type"] == "timestamp" + # make sure data is there + assert pipeline.last_trace.last_normalize_info.row_counts["flattened_dict__values"] == 4 \ No newline at end of file diff --git a/tests/utils.py b/tests/utils.py index 2eba788542..e72bcd9a20 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -27,7 +27,7 @@ # destination constants -IMPLEMENTED_DESTINATIONS = {"athena", "duckdb", "bigquery", "redshift", "postgres", "snowflake", "filesystem", "weaviate", "dummy", "motherduck", "mssql"} +IMPLEMENTED_DESTINATIONS = {"athena", "duckdb", "bigquery", "redshift", "postgres", "snowflake", "filesystem", "weaviate", "dummy", "motherduck", "mssql", "synapse"} NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck"} SQL_DESTINATIONS = IMPLEMENTED_DESTINATIONS - NON_SQL_DESTINATIONS @@ -36,7 +36,9 @@ # filter out active destinations for current tests -ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS) +# TODO remove filter +# ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS) +ACTIVE_DESTINATIONS = {"synapse"} ACTIVE_SQL_DESTINATIONS = SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS) ACTIVE_NON_SQL_DESTINATIONS = NON_SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS)