diff --git a/.github/workflows/test_destination_databricks.yml b/.github/workflows/test_destination_databricks.yml new file mode 100644 index 0000000000..f301a1b9ed --- /dev/null +++ b/.github/workflows/test_destination_databricks.yml @@ -0,0 +1,88 @@ + +name: test databricks + +on: + pull_request: + branches: + - master + - devel + workflow_dispatch: + +env: + DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }} + + RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752 + RUNTIME__LOG_LEVEL: ERROR + + ACTIVE_DESTINATIONS: "[\"databricks\"]" + ALL_FILESYSTEM_DRIVERS: "[\"memory\"]" + +jobs: + get_docs_changes: + uses: ./.github/workflows/get_docs_changes.yml + if: ${{ !github.event.pull_request.head.repo.fork }} + + run_loader: + name: Tests Databricks loader + needs: get_docs_changes + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + # os: ["ubuntu-latest", "macos-latest", "windows-latest"] + defaults: + run: + shell: bash + runs-on: ${{ matrix.os }} + + steps: + + - name: Check out + uses: actions/checkout@master + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.10.x" + + - name: Install Poetry + uses: snok/install-poetry@v1.3.2 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v3 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp + + - name: Install dependencies + run: poetry install --no-interaction -E databricks -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline + + - name: create secrets.toml + run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml + + - run: | + poetry run pytest tests/load + if: runner.os != 'Windows' + name: Run tests Linux/MAC + - run: | + poetry run pytest tests/load + if: runner.os == 'Windows' + name: Run tests Windows + shell: cmd + + matrix_job_required_check: + name: Databricks loader tests + needs: run_loader + runs-on: ubuntu-latest + if: always() + steps: + - name: Check matrix job results + if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') + run: | + echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1 diff --git a/dlt/common/configuration/specs/aws_credentials.py b/dlt/common/configuration/specs/aws_credentials.py index f6df1d8cce..ee7360e2cb 100644 --- a/dlt/common/configuration/specs/aws_credentials.py +++ b/dlt/common/configuration/specs/aws_credentials.py @@ -38,6 +38,13 @@ def to_native_representation(self) -> Dict[str, Optional[str]]: """Return a dict that can be passed as kwargs to boto3 session""" return dict(self) + def to_session_credentials(self) -> Dict[str, str]: + return dict( + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + aws_session_token=self.aws_session_token, + ) + @configspec class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault): @@ -47,6 +54,23 @@ def on_partial(self) -> None: if self._from_session(session) and not self.is_partial(): self.resolve() + def to_session_credentials(self) -> Dict[str, str]: + """Return configured or new aws session token""" + if self.aws_session_token and self.aws_access_key_id and self.aws_secret_access_key: + return dict( + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + aws_session_token=self.aws_session_token, + ) + sess = self._to_botocore_session() + client = sess.create_client("sts") + token = client.get_session_token() + return dict( + aws_access_key_id=token["Credentials"]["AccessKeyId"], + aws_secret_access_key=token["Credentials"]["SecretAccessKey"], + aws_session_token=token["Credentials"]["SessionToken"], + ) + def _to_botocore_session(self) -> Any: try: import botocore.session diff --git a/dlt/common/data_writers/escape.py b/dlt/common/data_writers/escape.py index 5bf8f29ccb..f27b48a95f 100644 --- a/dlt/common/data_writers/escape.py +++ b/dlt/common/data_writers/escape.py @@ -124,3 +124,24 @@ def escape_snowflake_identifier(v: str) -> str: # Snowcase uppercase all identifiers unless quoted. Match this here so queries on information schema work without issue # See also https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers return escape_postgres_identifier(v.upper()) + + +escape_databricks_identifier = escape_bigquery_identifier + + +DATABRICKS_ESCAPE_DICT = {"'": "\\'", "\\": "\\\\", "\n": "\\n", "\r": "\\r"} + + +def escape_databricks_literal(v: Any) -> Any: + if isinstance(v, str): + return _escape_extended(v, prefix="'", escape_dict=DATABRICKS_ESCAPE_DICT) + if isinstance(v, (datetime, date, time)): + return f"'{v.isoformat()}'" + if isinstance(v, (list, dict)): + return _escape_extended(json.dumps(v), prefix="'", escape_dict=DATABRICKS_ESCAPE_DICT) + if isinstance(v, bytes): + return f"X'{v.hex()}'" + if v is None: + return "NULL" + + return str(v) diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index 2596b2bf99..b891d4b31f 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -52,6 +52,9 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): schema_supports_numeric_precision: bool = True timestamp_precision: int = 6 max_rows_per_insert: Optional[int] = None + supports_multiple_statements: bool = True + supports_clone_table: bool = False + """Destination supports CREATE TABLE ... CLONE ... statements""" # do not allow to create default value, destination caps must be always explicitly inserted into container can_create_default: ClassVar[bool] = False @@ -77,4 +80,5 @@ def generic_capabilities( caps.is_max_text_data_type_length_in_bytes = True caps.supports_ddl_transactions = True caps.supports_transactions = True + caps.supports_multiple_statements = True return caps diff --git a/dlt/common/time.py b/dlt/common/time.py index ed390c28bf..4f4dd05ef0 100644 --- a/dlt/common/time.py +++ b/dlt/common/time.py @@ -138,6 +138,43 @@ def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time: raise TypeError(f"Cannot coerce {value} to a pendulum.Time object.") +def to_py_datetime(value: datetime.datetime) -> datetime.datetime: + """Convert a pendulum.DateTime to a py datetime object. + + Args: + value: The value to convert. Can be a pendulum.DateTime or datetime. + + Returns: + A py datetime object + """ + if isinstance(value, pendulum.DateTime): + return datetime.datetime( + value.year, + value.month, + value.day, + value.hour, + value.minute, + value.second, + value.microsecond, + value.tzinfo, + ) + return value + + +def to_py_date(value: datetime.date) -> datetime.date: + """Convert a pendulum.Date to a py date object. + + Args: + value: The value to convert. Can be a pendulum.Date or date. + + Returns: + A py date object + """ + if isinstance(value, pendulum.Date): + return datetime.date(value.year, value.month, value.day) + return value + + def _datetime_from_ts_or_iso( value: Union[int, float, str] ) -> Union[pendulum.DateTime, pendulum.Date, pendulum.Time]: diff --git a/dlt/destinations/__init__.py b/dlt/destinations/__init__.py index 980c4ce7f2..801d1d823a 100644 --- a/dlt/destinations/__init__.py +++ b/dlt/destinations/__init__.py @@ -10,6 +10,7 @@ from dlt.destinations.impl.qdrant.factory import qdrant from dlt.destinations.impl.motherduck.factory import motherduck from dlt.destinations.impl.weaviate.factory import weaviate +from dlt.destinations.impl.databricks.factory import databricks __all__ = [ @@ -25,4 +26,5 @@ "qdrant", "motherduck", "weaviate", + "databricks", ] diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 4837f0dbdf..91525d771c 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -232,7 +232,7 @@ def drop_tables(self, *tables: str) -> None: statements = [ f"DROP TABLE IF EXISTS {self.make_qualified_ddl_table_name(table)};" for table in tables ] - self.execute_fragments(statements) + self.execute_many(statements) @contextmanager @raise_database_error @@ -351,9 +351,7 @@ def _from_db_type( return self.type_mapper.from_db_type(hive_t, precision, scale) def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: - return ( - f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}" - ) + return f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}" def _get_table_update_sql( self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool @@ -378,15 +376,19 @@ def _get_table_update_sql( # use qualified table names qualified_table_name = self.sql_client.make_qualified_ddl_table_name(table_name) if is_iceberg and not generate_alter: - sql.append(f"""CREATE TABLE {qualified_table_name} + sql.append( + f"""CREATE TABLE {qualified_table_name} ({columns}) LOCATION '{location}' - TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""") + TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""" + ) elif not generate_alter: - sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} + sql.append( + f"""CREATE EXTERNAL TABLE {qualified_table_name} ({columns}) STORED AS PARQUET - LOCATION '{location}';""") + LOCATION '{location}';""" + ) # alter table to add new columns at the end else: sql.append(f"""ALTER TABLE {qualified_table_name} ADD COLUMNS ({columns});""") diff --git a/dlt/destinations/impl/bigquery/__init__.py b/dlt/destinations/impl/bigquery/__init__.py index 1304bd72bb..6d1491817a 100644 --- a/dlt/destinations/impl/bigquery/__init__.py +++ b/dlt/destinations/impl/bigquery/__init__.py @@ -20,5 +20,6 @@ def capabilities() -> DestinationCapabilitiesContext: caps.max_text_data_type_length = 10 * 1024 * 1024 caps.is_max_text_data_type_length_in_bytes = True caps.supports_ddl_transactions = False + caps.supports_clone_table = True return caps diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 254184b96d..1058b1d2c9 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -30,6 +30,7 @@ from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration from dlt.destinations.impl.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS from dlt.destinations.job_client_impl import SqlJobClientWithStaging +from dlt.destinations.sql_jobs import SqlMergeJob, SqlJobParams from dlt.destinations.job_impl import NewReferenceJob from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams @@ -149,28 +150,6 @@ def gen_key_table_clauses( return sql -class BigqueryStagingCopyJob(SqlStagingCopyJob): - @classmethod - def generate_sql( - cls, - table_chain: Sequence[TTableSchema], - sql_client: SqlClientBase[Any], - params: Optional[SqlJobParams] = None, - ) -> List[str]: - sql: List[str] = [] - for table in table_chain: - with sql_client.with_staging_dataset(staging=True): - staging_table_name = sql_client.make_qualified_table_name(table["name"]) - table_name = sql_client.make_qualified_table_name(table["name"]) - sql.extend( - ( - f"DROP TABLE IF EXISTS {table_name};", - f"CREATE TABLE {table_name} CLONE {staging_table_name};", - ) - ) - return sql - - class BigQueryClient(SqlJobClientWithStaging, SupportsStagingDestination): capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() @@ -190,13 +169,6 @@ def __init__(self, schema: Schema, config: BigQueryClientConfiguration) -> None: def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: return [BigQueryMergeJob.from_table_chain(table_chain, self.sql_client)] - def _create_replace_followup_jobs( - self, table_chain: Sequence[TTableSchema] - ) -> List[NewLoadJob]: - if self.config.replace_strategy == "staging-optimized": - return [BigqueryStagingCopyJob.from_table_chain(table_chain, self.sql_client)] - return super()._create_replace_followup_jobs(table_chain) - def restore_file_load(self, file_path: str) -> LoadJob: """Returns a completed SqlLoadJob or restored BigQueryLoadJob @@ -280,9 +252,9 @@ def _get_table_update_sql( elif (c := partition_list[0])["data_type"] == "date": sql[0] = f"{sql[0]}\nPARTITION BY {self.capabilities.escape_identifier(c['name'])}" elif (c := partition_list[0])["data_type"] == "timestamp": - sql[0] = ( - f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})" - ) + sql[ + 0 + ] = f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})" # Automatic partitioning of an INT64 type requires us to be prescriptive - we treat the column as a UNIX timestamp. # This is due to the bounds requirement of GENERATE_ARRAY function for partitioning. # The 10,000 partitions limit makes it infeasible to cover the entire `bigint` range. @@ -300,9 +272,7 @@ def _get_table_update_sql( def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: name = self.capabilities.escape_identifier(c["name"]) - return ( - f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}" - ) + return f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}" def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]: schema_table: TTableSchemaColumns = {} diff --git a/dlt/destinations/impl/databricks/__init__.py b/dlt/destinations/impl/databricks/__init__.py new file mode 100644 index 0000000000..f63d294818 --- /dev/null +++ b/dlt/destinations/impl/databricks/__init__.py @@ -0,0 +1,30 @@ +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.data_writers.escape import escape_databricks_identifier, escape_databricks_literal +from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE + +from dlt.destinations.impl.databricks.configuration import DatabricksClientConfiguration + + +def capabilities() -> DestinationCapabilitiesContext: + caps = DestinationCapabilitiesContext() + caps.preferred_loader_file_format = "insert_values" + caps.supported_loader_file_formats = ["insert_values"] + caps.preferred_staging_file_format = "jsonl" + caps.supported_staging_file_formats = ["jsonl", "parquet"] + caps.escape_identifier = escape_databricks_identifier + caps.escape_literal = escape_databricks_literal + caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE) + caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0) + caps.max_identifier_length = 255 + caps.max_column_identifier_length = 255 + caps.max_query_length = 2 * 1024 * 1024 + caps.is_max_query_length_in_bytes = True + caps.max_text_data_type_length = 16 * 1024 * 1024 + caps.is_max_text_data_type_length_in_bytes = True + caps.supports_ddl_transactions = False + caps.supports_truncate_command = True + # caps.supports_transactions = False + caps.alter_add_multi_column = True + caps.supports_multiple_statements = False + caps.supports_clone_table = True + return caps diff --git a/dlt/destinations/impl/databricks/configuration.py b/dlt/destinations/impl/databricks/configuration.py new file mode 100644 index 0000000000..924047e30f --- /dev/null +++ b/dlt/destinations/impl/databricks/configuration.py @@ -0,0 +1,51 @@ +from typing import ClassVar, Final, Optional, Any, Dict, List + +from dlt.common.typing import TSecretStrValue +from dlt.common.configuration.exceptions import ConfigurationValueError +from dlt.common.configuration.specs.base_configuration import CredentialsConfiguration, configspec +from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration + + +@configspec +class DatabricksCredentials(CredentialsConfiguration): + catalog: str = None + server_hostname: str = None + http_path: str = None + access_token: Optional[TSecretStrValue] = None + http_headers: Optional[Dict[str, str]] = None + session_configuration: Optional[Dict[str, Any]] = None + """Dict of session parameters that will be passed to `databricks.sql.connect`""" + connection_parameters: Optional[Dict[str, Any]] = None + """Additional keyword arguments that are passed to `databricks.sql.connect`""" + socket_timeout: Optional[int] = 180 + + __config_gen_annotations__: ClassVar[List[str]] = [ + "server_hostname", + "http_path", + "catalog", + "access_token", + ] + + def to_connector_params(self) -> Dict[str, Any]: + return dict( + catalog=self.catalog, + server_hostname=self.server_hostname, + http_path=self.http_path, + access_token=self.access_token, + session_configuration=self.session_configuration or {}, + _socket_timeout=self.socket_timeout, + **(self.connection_parameters or {}), + ) + + +@configspec +class DatabricksClientConfiguration(DestinationClientDwhWithStagingConfiguration): + destination_type: Final[str] = "databricks" # type: ignore[misc] + credentials: DatabricksCredentials + + def __str__(self) -> str: + """Return displayable destination location""" + if self.staging_config: + return str(self.staging_config.credentials) + else: + return "[no staging set]" diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py new file mode 100644 index 0000000000..b5a404302f --- /dev/null +++ b/dlt/destinations/impl/databricks/databricks.py @@ -0,0 +1,317 @@ +from typing import ClassVar, Dict, Optional, Sequence, Tuple, List, Any, Iterable, Type, cast +from urllib.parse import urlparse, urlunparse + +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.destination.reference import ( + FollowupJob, + NewLoadJob, + TLoadJobState, + LoadJob, + CredentialsConfiguration, + SupportsStagingDestination, +) +from dlt.common.configuration.specs import ( + AwsCredentialsWithoutDefaults, + AzureCredentials, + AzureCredentialsWithoutDefaults, +) +from dlt.common.data_types import TDataType +from dlt.common.storages.file_storage import FileStorage +from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchema, TColumnType, TSchemaTables, TTableFormat +from dlt.common.schema.utils import table_schema_has_type + + +from dlt.destinations.insert_job_client import InsertValuesJobClient +from dlt.destinations.job_impl import EmptyLoadJob +from dlt.destinations.exceptions import LoadJobTerminalException + +from dlt.destinations.impl.databricks import capabilities +from dlt.destinations.impl.databricks.configuration import DatabricksClientConfiguration +from dlt.destinations.impl.databricks.sql_client import DatabricksSqlClient +from dlt.destinations.sql_jobs import SqlMergeJob, SqlJobParams +from dlt.destinations.job_impl import NewReferenceJob +from dlt.destinations.sql_client import SqlClientBase +from dlt.destinations.type_mapping import TypeMapper +from dlt.common.storages import FilesystemConfiguration, fsspec_from_config +from dlt import config + + +class DatabricksTypeMapper(TypeMapper): + sct_to_unbound_dbt = { + "complex": "STRING", # Databricks supports complex types like ARRAY + "text": "STRING", + "double": "DOUBLE", + "bool": "BOOLEAN", + "date": "DATE", + "timestamp": "TIMESTAMP", # TIMESTAMP for local timezone + "bigint": "BIGINT", + "binary": "BINARY", + "decimal": "DECIMAL", # DECIMAL(p,s) format + "time": "STRING", + } + + dbt_to_sct = { + "STRING": "text", + "DOUBLE": "double", + "BOOLEAN": "bool", + "DATE": "date", + "TIMESTAMP": "timestamp", + "BIGINT": "bigint", + "INT": "bigint", + "SMALLINT": "bigint", + "TINYINT": "bigint", + "BINARY": "binary", + "DECIMAL": "decimal", + } + + sct_to_dbt = { + "decimal": "DECIMAL(%i,%i)", + "wei": "DECIMAL(%i,%i)", + } + + def to_db_integer_type( + self, precision: Optional[int], table_format: TTableFormat = None + ) -> str: + if precision is None: + return "BIGINT" + if precision <= 8: + return "TINYINT" + if precision <= 16: + return "SMALLINT" + if precision <= 32: + return "INT" + return "BIGINT" + + def from_db_type( + self, db_type: str, precision: Optional[int] = None, scale: Optional[int] = None + ) -> TColumnType: + # precision and scale arguments here are meaningless as they're not included separately in information schema + # We use full_data_type from databricks which is either in form "typename" or "typename(precision, scale)" + type_parts = db_type.split("(") + if len(type_parts) > 1: + db_type = type_parts[0] + scale_str = type_parts[1].strip(")") + precision, scale = [int(val) for val in scale_str.split(",")] + else: + scale = precision = None + db_type = db_type.upper() + if db_type == "DECIMAL": + if (precision, scale) == self.wei_precision(): + return dict(data_type="wei", precision=precision, scale=scale) + return super().from_db_type(db_type, precision, scale) + + +class DatabricksLoadJob(LoadJob, FollowupJob): + def __init__( + self, + table: TTableSchema, + file_path: str, + table_name: str, + load_id: str, + client: DatabricksSqlClient, + staging_config: FilesystemConfiguration, + ) -> None: + file_name = FileStorage.get_file_name_from_file_path(file_path) + super().__init__(file_name) + staging_credentials = staging_config.credentials + + qualified_table_name = client.make_qualified_table_name(table_name) + + # extract and prepare some vars + bucket_path = orig_bucket_path = ( + NewReferenceJob.resolve_reference(file_path) + if NewReferenceJob.is_reference_job(file_path) + else "" + ) + file_name = ( + FileStorage.get_file_name_from_file_path(bucket_path) if bucket_path else file_name + ) + from_clause = "" + credentials_clause = "" + format_options_clause = "" + + if bucket_path: + bucket_url = urlparse(bucket_path) + bucket_scheme = bucket_url.scheme + # referencing an staged files via a bucket URL requires explicit AWS credentials + if bucket_scheme == "s3" and isinstance( + staging_credentials, AwsCredentialsWithoutDefaults + ): + s3_creds = staging_credentials.to_session_credentials() + credentials_clause = f"""WITH(CREDENTIAL( + AWS_ACCESS_KEY='{s3_creds["aws_access_key_id"]}', + AWS_SECRET_KEY='{s3_creds["aws_secret_access_key"]}', + + AWS_SESSION_TOKEN='{s3_creds["aws_session_token"]}' + )) + """ + from_clause = f"FROM '{bucket_path}'" + elif bucket_scheme in ["az", "abfs"] and isinstance( + staging_credentials, AzureCredentialsWithoutDefaults + ): + # Explicit azure credentials are needed to load from bucket without a named stage + credentials_clause = f"""WITH(CREDENTIAL(AZURE_SAS_TOKEN='{staging_credentials.azure_storage_sas_token}'))""" + # Converts an az:/// to abfss://@.dfs.core.windows.net/ + # as required by snowflake + _path = bucket_url.path + bucket_path = urlunparse( + bucket_url._replace( + scheme="abfss", + netloc=f"{bucket_url.netloc}@{staging_credentials.azure_storage_account_name}.dfs.core.windows.net", + path=_path, + ) + ) + from_clause = f"FROM '{bucket_path}'" + else: + raise LoadJobTerminalException( + file_path, + f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and azure buckets are supported", + ) + else: + raise LoadJobTerminalException( + file_path, + "Cannot load from local file. Databricks does not support loading from local files. Configure staging with an s3 or azure storage bucket.", + ) + + # decide on source format, stage_file_path will either be a local file or a bucket path + if file_name.endswith(".parquet"): + source_format = "PARQUET" # Only parquet is supported + elif file_name.endswith(".jsonl"): + if not config.get("data_writer.disable_compression"): + raise LoadJobTerminalException( + file_path, + "Databricks loader does not support gzip compressed JSON files. Please disable compression in the data writer configuration: https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression", + ) + if table_schema_has_type(table, "decimal"): + raise LoadJobTerminalException( + file_path, + "Databricks loader cannot load DECIMAL type columns from json files. Switch to parquet format to load decimals.", + ) + if table_schema_has_type(table, "binary"): + raise LoadJobTerminalException( + file_path, + "Databricks loader cannot load BINARY type columns from json files. Switch to parquet format to load byte values.", + ) + if table_schema_has_type(table, "complex"): + raise LoadJobTerminalException( + file_path, + "Databricks loader cannot load complex columns (lists and dicts) from json files. Switch to parquet format to load complex types.", + ) + if table_schema_has_type(table, "date"): + raise LoadJobTerminalException( + file_path, + "Databricks loader cannot load DATE type columns from json files. Switch to parquet format to load dates.", + ) + + source_format = "JSON" + format_options_clause = "FORMAT_OPTIONS('inferTimestamp'='true')" + # Databricks fails when trying to load empty json files, so we have to check the file size + fs, _ = fsspec_from_config(staging_config) + file_size = fs.size(orig_bucket_path) + if file_size == 0: # Empty file, do nothing + return + + statement = f"""COPY INTO {qualified_table_name} + {from_clause} + {credentials_clause} + FILEFORMAT = {source_format} + {format_options_clause} + """ + client.execute_sql(statement) + + def state(self) -> TLoadJobState: + return "completed" + + def exception(self) -> str: + raise NotImplementedError() + + +class DatabricksMergeJob(SqlMergeJob): + @classmethod + def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: + return f"CREATE TEMPORARY VIEW {temp_table_name} AS {select_sql};" + + @classmethod + def gen_delete_from_sql( + cls, table_name: str, column_name: str, temp_table_name: str, temp_table_column: str + ) -> str: + # Databricks does not support subqueries in DELETE FROM statements so we use a MERGE statement instead + return f"""MERGE INTO {table_name} + USING {temp_table_name} + ON {table_name}.{column_name} = {temp_table_name}.{temp_table_column} + WHEN MATCHED THEN DELETE; + """ + + +class DatabricksClient(InsertValuesJobClient, SupportsStagingDestination): + capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() + + def __init__(self, schema: Schema, config: DatabricksClientConfiguration) -> None: + sql_client = DatabricksSqlClient(config.normalize_dataset_name(schema), config.credentials) + super().__init__(schema, config, sql_client) + self.config: DatabricksClientConfiguration = config + self.sql_client: DatabricksSqlClient = sql_client + self.type_mapper = DatabricksTypeMapper(self.capabilities) + + def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: + job = super().start_file_load(table, file_path, load_id) + + if not job: + job = DatabricksLoadJob( + table, + file_path, + table["name"], + load_id, + self.sql_client, + staging_config=cast(FilesystemConfiguration, self.config.staging_config), + ) + return job + + def restore_file_load(self, file_path: str) -> LoadJob: + return EmptyLoadJob.from_file_path(file_path, "completed") + + def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: + return [DatabricksMergeJob.from_table_chain(table_chain, self.sql_client)] + + def _make_add_column_sql( + self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None + ) -> List[str]: + # Override because databricks requires multiple columns in a single ADD COLUMN clause + return ["ADD COLUMN\n" + ",\n".join(self._get_column_def_sql(c) for c in new_columns)] + + def _get_table_update_sql( + self, + table_name: str, + new_columns: Sequence[TColumnSchema], + generate_alter: bool, + separate_alters: bool = False, + ) -> List[str]: + sql = super()._get_table_update_sql(table_name, new_columns, generate_alter) + + cluster_list = [ + self.capabilities.escape_identifier(c["name"]) for c in new_columns if c.get("cluster") + ] + + if cluster_list: + sql[0] = sql[0] + "\nCLUSTER BY (" + ",".join(cluster_list) + ")" + + return sql + + def _from_db_type( + self, bq_t: str, precision: Optional[int], scale: Optional[int] + ) -> TColumnType: + return self.type_mapper.from_db_type(bq_t, precision, scale) + + def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: + name = self.capabilities.escape_identifier(c["name"]) + return ( + f"{name} {self.type_mapper.to_db_type(c)} {self._gen_not_null(c.get('nullable', True))}" + ) + + def _get_storage_table_query_columns(self) -> List[str]: + fields = super()._get_storage_table_query_columns() + fields[ + 1 + ] = "full_data_type" # Override because this is the only way to get data type with precision + return fields diff --git a/dlt/destinations/impl/databricks/factory.py b/dlt/destinations/impl/databricks/factory.py new file mode 100644 index 0000000000..7c6c95137d --- /dev/null +++ b/dlt/destinations/impl/databricks/factory.py @@ -0,0 +1,48 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +from dlt.destinations.impl.databricks.configuration import ( + DatabricksCredentials, + DatabricksClientConfiguration, +) +from dlt.destinations.impl.databricks import capabilities + +if t.TYPE_CHECKING: + from dlt.destinations.impl.databricks.databricks import DatabricksClient + + +class databricks(Destination[DatabricksClientConfiguration, "DatabricksClient"]): + spec = DatabricksClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["DatabricksClient"]: + from dlt.destinations.impl.databricks.databricks import DatabricksClient + + return DatabricksClient + + def __init__( + self, + credentials: t.Union[DatabricksCredentials, t.Dict[str, t.Any], str] = None, + destination_name: t.Optional[str] = None, + environment: t.Optional[str] = None, + **kwargs: t.Any, + ) -> None: + """Configure the Databricks destination to use in a pipeline. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + Args: + credentials: Credentials to connect to the databricks database. Can be an instance of `DatabricksCredentials` or + a connection string in the format `databricks://user:password@host:port/database` + **kwargs: Additional arguments passed to the destination config + """ + super().__init__( + credentials=credentials, + destination_name=destination_name, + environment=environment, + **kwargs, + ) diff --git a/dlt/destinations/impl/databricks/sql_client.py b/dlt/destinations/impl/databricks/sql_client.py new file mode 100644 index 0000000000..68ea863cc4 --- /dev/null +++ b/dlt/destinations/impl/databricks/sql_client.py @@ -0,0 +1,155 @@ +from contextlib import contextmanager, suppress +from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence, List, Union, Dict + +from databricks import sql as databricks_lib +from databricks.sql.client import ( + Connection as DatabricksSqlConnection, + Cursor as DatabricksSqlCursor, +) +from databricks.sql.exc import Error as DatabricksSqlError + +from dlt.common import pendulum +from dlt.common import logger +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.destinations.exceptions import ( + DatabaseTerminalException, + DatabaseTransientException, + DatabaseUndefinedRelation, +) +from dlt.destinations.sql_client import ( + DBApiCursorImpl, + SqlClientBase, + raise_database_error, + raise_open_connection_error, +) +from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction +from dlt.destinations.impl.databricks.configuration import DatabricksCredentials +from dlt.destinations.impl.databricks import capabilities +from dlt.common.time import to_py_date, to_py_datetime + + +class DatabricksSqlClient(SqlClientBase[DatabricksSqlConnection], DBTransaction): + dbapi: ClassVar[DBApi] = databricks_lib + capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() + + def __init__(self, dataset_name: str, credentials: DatabricksCredentials) -> None: + super().__init__(credentials.catalog, dataset_name) + self._conn: DatabricksSqlConnection = None + self.credentials = credentials + + def open_connection(self) -> DatabricksSqlConnection: + conn_params = self.credentials.to_connector_params() + self._conn = databricks_lib.connect(**conn_params, schema=self.dataset_name) + return self._conn + + @raise_open_connection_error + def close_connection(self) -> None: + if self._conn: + self._conn.close() + self._conn = None + + @contextmanager + def begin_transaction(self) -> Iterator[DBTransaction]: + # Databricks does not support transactions + yield self + + @raise_database_error + def commit_transaction(self) -> None: + # Databricks does not support transactions + pass + + @raise_database_error + def rollback_transaction(self) -> None: + # Databricks does not support transactions + pass + + @property + def native_connection(self) -> "DatabricksSqlConnection": + return self._conn + + def drop_dataset(self) -> None: + self.execute_sql("DROP SCHEMA IF EXISTS %s CASCADE;" % self.fully_qualified_dataset_name()) + + def drop_tables(self, *tables: str) -> None: + # Tables are drop with `IF EXISTS`, but databricks raises when the schema doesn't exist. + # Multi statement exec is safe and the error can be ignored since all tables are in the same schema. + with suppress(DatabaseUndefinedRelation): + super().drop_tables(*tables) + + def execute_sql( + self, sql: AnyStr, *args: Any, **kwargs: Any + ) -> Optional[Sequence[Sequence[Any]]]: + with self.execute_query(sql, *args, **kwargs) as curr: + if curr.description is None: + return None + else: + f = curr.fetchall() + return f + + @contextmanager + @raise_database_error + def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]: + curr: DBApiCursor = None + # TODO: databricks connector 3.0.0 will use :named paramstyle only + # if args: + # keys = [f"arg{i}" for i in range(len(args))] + # # Replace position arguments (%s) with named arguments (:arg0, :arg1, ...) + # # query = query % tuple(f":{key}" for key in keys) + # db_args = {} + # for key, db_arg in zip(keys, args): + # # Databricks connector doesn't accept pendulum objects + # if isinstance(db_arg, pendulum.DateTime): + # db_arg = to_py_datetime(db_arg) + # elif isinstance(db_arg, pendulum.Date): + # db_arg = to_py_date(db_arg) + # db_args[key] = db_arg + # else: + # db_args = None + db_args: Optional[Union[Dict[str, Any], Sequence[Any]]] + if kwargs: + db_args = kwargs + elif args: + db_args = args + else: + db_args = None + with self._conn.cursor() as curr: + curr.execute(query, db_args) + yield DBApiCursorImpl(curr) # type: ignore[abstract] + + def fully_qualified_dataset_name(self, escape: bool = True) -> str: + if escape: + catalog = self.capabilities.escape_identifier(self.credentials.catalog) + dataset_name = self.capabilities.escape_identifier(self.dataset_name) + else: + catalog = self.credentials.catalog + dataset_name = self.dataset_name + return f"{catalog}.{dataset_name}" + + @staticmethod + def _make_database_exception(ex: Exception) -> Exception: + if isinstance(ex, databricks_lib.ServerOperationError): + if "TABLE_OR_VIEW_NOT_FOUND" in str(ex): + return DatabaseUndefinedRelation(ex) + elif "SCHEMA_NOT_FOUND" in str(ex): + return DatabaseUndefinedRelation(ex) + elif "PARSE_SYNTAX_ERROR" in str(ex): + return DatabaseTransientException(ex) + return DatabaseTerminalException(ex) + elif isinstance(ex, databricks_lib.OperationalError): + return DatabaseTerminalException(ex) + elif isinstance(ex, (databricks_lib.ProgrammingError, databricks_lib.IntegrityError)): + return DatabaseTerminalException(ex) + elif isinstance(ex, databricks_lib.DatabaseError): + return DatabaseTransientException(ex) + else: + return DatabaseTransientException(ex) + + @staticmethod + def _maybe_make_terminal_exception_from_data_error( + databricks_ex: databricks_lib.DatabaseError, + ) -> Optional[Exception]: + return None + + @staticmethod + def is_dbapi_exception(ex: Exception) -> bool: + return isinstance(ex, databricks_lib.DatabaseError) diff --git a/dlt/destinations/impl/mssql/sql_client.py b/dlt/destinations/impl/mssql/sql_client.py index 427518feeb..53ed7cfd90 100644 --- a/dlt/destinations/impl/mssql/sql_client.py +++ b/dlt/destinations/impl/mssql/sql_client.py @@ -115,7 +115,7 @@ def _drop_views(self, *tables: str) -> None: statements = [ f"DROP VIEW IF EXISTS {self.make_qualified_table_name(table)};" for table in tables ] - self.execute_fragments(statements) + self.execute_many(statements) def execute_sql( self, sql: AnyStr, *args: Any, **kwargs: Any diff --git a/dlt/destinations/impl/snowflake/__init__.py b/dlt/destinations/impl/snowflake/__init__.py index d6bebd3fdd..dde4d5a382 100644 --- a/dlt/destinations/impl/snowflake/__init__.py +++ b/dlt/destinations/impl/snowflake/__init__.py @@ -21,4 +21,5 @@ def capabilities() -> DestinationCapabilitiesContext: caps.is_max_text_data_type_length_in_bytes = True caps.supports_ddl_transactions = True caps.alter_add_multi_column = True + caps.supports_clone_table = True return caps diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index 67df78c138..fb51ab9d36 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -27,7 +27,7 @@ from dlt.destinations.impl.snowflake import capabilities from dlt.destinations.impl.snowflake.configuration import SnowflakeClientConfiguration from dlt.destinations.impl.snowflake.sql_client import SnowflakeSqlClient -from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlJobParams +from dlt.destinations.sql_jobs import SqlJobParams from dlt.destinations.impl.snowflake.sql_client import SnowflakeSqlClient from dlt.destinations.job_impl import NewReferenceJob from dlt.destinations.sql_client import SqlClientBase @@ -175,13 +175,15 @@ def __init__( f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,' " AUTO_COMPRESS = FALSE" ) - client.execute_sql(f"""COPY INTO {qualified_table_name} + client.execute_sql( + f"""COPY INTO {qualified_table_name} {from_clause} {files_clause} {credentials_clause} FILE_FORMAT = {source_format} MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE' - """) + """ + ) if stage_file_path and not keep_staged_files: client.execute_sql(f"REMOVE {stage_file_path}") @@ -192,25 +194,6 @@ def exception(self) -> str: raise NotImplementedError() -class SnowflakeStagingCopyJob(SqlStagingCopyJob): - @classmethod - def generate_sql( - cls, - table_chain: Sequence[TTableSchema], - sql_client: SqlClientBase[Any], - params: Optional[SqlJobParams] = None, - ) -> List[str]: - sql: List[str] = [] - for table in table_chain: - with sql_client.with_staging_dataset(staging=True): - staging_table_name = sql_client.make_qualified_table_name(table["name"]) - table_name = sql_client.make_qualified_table_name(table["name"]) - sql.append(f"DROP TABLE IF EXISTS {table_name};") - # recreate destination table with data cloned from staging table - sql.append(f"CREATE TABLE {table_name} CLONE {staging_table_name};") - return sql - - class SnowflakeClient(SqlJobClientWithStaging, SupportsStagingDestination): capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() @@ -250,13 +233,6 @@ def _make_add_column_sql( + ",\n".join(self._get_column_def_sql(c, table_format) for c in new_columns) ] - def _create_replace_followup_jobs( - self, table_chain: Sequence[TTableSchema] - ) -> List[NewLoadJob]: - if self.config.replace_strategy == "staging-optimized": - return [SnowflakeStagingCopyJob.from_table_chain(table_chain, self.sql_client)] - return super()._create_replace_followup_jobs(table_chain) - def _get_table_update_sql( self, table_name: str, diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index ac68cfea8a..e7dc4bcbe2 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -50,7 +50,6 @@ FollowupJob, CredentialsConfiguration, ) -from dlt.common.utils import concat_strings_with_limit from dlt.destinations.exceptions import ( DatabaseUndefinedRelation, DestinationSchemaTampered, @@ -76,15 +75,19 @@ def __init__(self, file_path: str, sql_client: SqlClientBase[Any]) -> None: with FileStorage.open_zipsafe_ro(file_path, "r", encoding="utf-8") as f: sql = f.read() + # Some clients (e.g. databricks) do not support multiple statements in one execute call + if not sql_client.capabilities.supports_multiple_statements: + sql_client.execute_many(self._split_fragments(sql)) # if we detect ddl transactions, only execute transaction if supported by client - if ( + elif ( not self._string_containts_ddl_queries(sql) or sql_client.capabilities.supports_ddl_transactions ): # with sql_client.begin_transaction(): sql_client.execute_sql(sql) else: - sql_client.execute_sql(sql) + # sql_client.execute_sql(sql) + sql_client.execute_many(self._split_fragments(sql)) def state(self) -> TLoadJobState: # this job is always done @@ -100,6 +103,9 @@ def _string_containts_ddl_queries(self, sql: str) -> bool: return True return False + def _split_fragments(self, sql: str) -> List[str]: + return [s + (";" if not s.endswith(";") else "") for s in sql.split(";") if s.strip()] + @staticmethod def is_sql_job(file_path: str) -> bool: return os.path.splitext(file_path)[1][1:] == "sql" @@ -295,6 +301,15 @@ def __exit__( ) -> None: self.sql_client.close_connection() + def _get_storage_table_query_columns(self) -> List[str]: + """Column names used when querying table from information schema. + Override for databases that use different namings. + """ + fields = ["column_name", "data_type", "is_nullable"] + if self.capabilities.schema_supports_numeric_precision: + fields += ["numeric_precision", "numeric_scale"] + return fields + def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]: def _null_to_bool(v: str) -> bool: if v == "NO": @@ -303,9 +318,7 @@ def _null_to_bool(v: str) -> bool: return True raise ValueError(v) - fields = ["column_name", "data_type", "is_nullable"] - if self.capabilities.schema_supports_numeric_precision: - fields += ["numeric_precision", "numeric_scale"] + fields = self._get_storage_table_query_columns() db_params = self.sql_client.make_qualified_table_name(table_name, escape=False).split( ".", 3 ) @@ -383,10 +396,7 @@ def _execute_schema_update_sql(self, only_tables: Iterable[str]) -> TSchemaTable sql_scripts, schema_update = self._build_schema_update_sql(only_tables) # stay within max query size when doing DDL. some db backends use bytes not characters so decrease limit by half # assuming that most of the characters in DDL encode into single bytes - for sql_fragment in concat_strings_with_limit( - sql_scripts, "\n", self.capabilities.max_query_length // 2 - ): - self.sql_client.execute_sql(sql_fragment) + self.sql_client.execute_many(sql_scripts) self._update_schema_in_storage(self.schema) return schema_update diff --git a/dlt/destinations/sql_client.py b/dlt/destinations/sql_client.py index 1e5f7031a5..695f1a0972 100644 --- a/dlt/destinations/sql_client.py +++ b/dlt/destinations/sql_client.py @@ -19,8 +19,13 @@ from dlt.common.typing import TFun from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.utils import concat_strings_with_limit -from dlt.destinations.exceptions import DestinationConnectionError, LoadClientNotConnected +from dlt.destinations.exceptions import ( + DestinationConnectionError, + LoadClientNotConnected, + DatabaseTerminalException, +) from dlt.destinations.typing import DBApi, TNativeConn, DBApiCursor, DataFrame, DBTransaction @@ -86,7 +91,7 @@ def drop_dataset(self) -> None: def truncate_tables(self, *tables: str) -> None: statements = [self._truncate_table_sql(self.make_qualified_table_name(t)) for t in tables] - self.execute_fragments(statements) + self.execute_many(statements) def drop_tables(self, *tables: str) -> None: if not tables: @@ -94,7 +99,7 @@ def drop_tables(self, *tables: str) -> None: statements = [ f"DROP TABLE IF EXISTS {self.make_qualified_table_name(table)};" for table in tables ] - self.execute_fragments(statements) + self.execute_many(statements) @abstractmethod def execute_sql( @@ -114,6 +119,25 @@ def execute_fragments( """Executes several SQL fragments as efficiently as possible to prevent data copying. Default implementation just joins the strings and executes them together.""" return self.execute_sql("".join(fragments), *args, **kwargs) # type: ignore + def execute_many( + self, statements: Sequence[str], *args: Any, **kwargs: Any + ) -> Optional[Sequence[Sequence[Any]]]: + """Executes multiple SQL statements as efficiently as possible. When client supports multiple statements in a single query + they are executed together in as few database calls as possible. + """ + ret = [] + if self.capabilities.supports_multiple_statements: + for sql_fragment in concat_strings_with_limit( + list(statements), "\n", self.capabilities.max_query_length // 2 + ): + ret.append(self.execute_sql(sql_fragment, *args, **kwargs)) + else: + for statement in statements: + result = self.execute_sql(statement, *args, **kwargs) + if result is not None: + ret.append(result) + return ret + @abstractmethod def fully_qualified_dataset_name(self, escape: bool = True) -> str: pass diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index d97a098669..d0911d0bea 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -74,11 +74,28 @@ class SqlStagingCopyJob(SqlBaseJob): failed_text: str = "Tried to generate a staging copy sql job for the following tables:" @classmethod - def generate_sql( + def _generate_clone_sql( cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any], - params: Optional[SqlJobParams] = None, + ) -> List[str]: + """Drop and clone the table for supported destinations""" + sql: List[str] = [] + for table in table_chain: + with sql_client.with_staging_dataset(staging=True): + staging_table_name = sql_client.make_qualified_table_name(table["name"]) + table_name = sql_client.make_qualified_table_name(table["name"]) + sql.append(f"DROP TABLE IF EXISTS {table_name};") + # recreate destination table with data cloned from staging table + sql.append(f"CREATE TABLE {table_name} CLONE {staging_table_name};") + return sql + + @classmethod + def _generate_insert_sql( + cls, + table_chain: Sequence[TTableSchema], + sql_client: SqlClientBase[Any], + params: SqlJobParams = None, ) -> List[str]: sql: List[str] = [] for table in table_chain: @@ -98,6 +115,17 @@ def generate_sql( ) return sql + @classmethod + def generate_sql( + cls, + table_chain: Sequence[TTableSchema], + sql_client: SqlClientBase[Any], + params: SqlJobParams = None, + ) -> List[str]: + if params["replace"] and sql_client.capabilities.supports_clone_table: + return cls._generate_clone_sql(table_chain, sql_client) + return cls._generate_insert_sql(table_chain, sql_client, params) + class SqlMergeJob(SqlBaseJob): """Generates a list of sql statements that merge the data from staging dataset into destination dataset.""" @@ -186,6 +214,21 @@ def gen_insert_temp_table_sql( """ return [cls._to_temp_table(select_statement, temp_table_name)], temp_table_name + @classmethod + def gen_delete_from_sql( + cls, + table_name: str, + unique_column: str, + delete_temp_table_name: str, + temp_table_column: str, + ) -> str: + """Generate DELETE FROM statement deleting the records found in the deletes temp table.""" + return f"""DELETE FROM {table_name} + WHERE {unique_column} IN ( + SELECT * FROM {delete_temp_table_name} + ); + """ + @classmethod def _new_temp_table_name(cls, name_prefix: str) -> str: return f"{name_prefix}_{uniq_id()}" @@ -261,12 +304,9 @@ def gen_merge_sql( unique_column, key_table_clauses ) sql.extend(create_delete_temp_table_sql) - # delete top table - sql.append( - f"DELETE FROM {root_table_name} WHERE {unique_column} IN (SELECT * FROM" - f" {delete_temp_table_name});" - ) - # delete other tables + + # delete from child tables first. This is important for databricks which does not support temporary tables, + # but uses temporary views instead for table in table_chain[1:]: table_name = sql_client.make_qualified_table_name(table["name"]) root_key_columns = get_columns_names_with_prop(table, "root_key") @@ -281,15 +321,25 @@ def gen_merge_sql( ) root_key_column = sql_client.capabilities.escape_identifier(root_key_columns[0]) sql.append( - f"DELETE FROM {table_name} WHERE {root_key_column} IN (SELECT * FROM" - f" {delete_temp_table_name});" + cls.gen_delete_from_sql( + table_name, root_key_column, delete_temp_table_name, unique_column + ) + ) + + # delete from top table now that child tables have been prcessed + sql.append( + cls.gen_delete_from_sql( + root_table_name, unique_column, delete_temp_table_name, unique_column ) + ) + # create temp table used to deduplicate, only when we have primary keys if primary_keys: - create_insert_temp_table_sql, insert_temp_table_name = ( - cls.gen_insert_temp_table_sql( - staging_root_table_name, primary_keys, unique_column - ) + ( + create_insert_temp_table_sql, + insert_temp_table_name, + ) = cls.gen_insert_temp_table_sql( + staging_root_table_name, primary_keys, unique_column ) sql.extend(create_insert_temp_table_sql) diff --git a/dlt/helpers/dbt/profiles.yml b/dlt/helpers/dbt/profiles.yml index 2414222cbd..a9a30106b9 100644 --- a/dlt/helpers/dbt/profiles.yml +++ b/dlt/helpers/dbt/profiles.yml @@ -141,4 +141,17 @@ athena: schema: "{{ var('destination_dataset_name', var('source_dataset_name')) }}" database: "{{ env_var('DLT__AWS_DATA_CATALOG') }}" # aws_profile_name: "{{ env_var('DLT__CREDENTIALS__PROFILE_NAME', '') }}" - work_group: "{{ env_var('DLT__ATHENA_WORK_GROUP', '') }}" \ No newline at end of file + work_group: "{{ env_var('DLT__ATHENA_WORK_GROUP', '') }}" + + +databricks: + target: analytics + outputs: + analytics: + type: databricks + catalog: "{{ env_var('DLT__CREDENTIALS__CATALOG') }}" + schema: "{{ var('destination_dataset_name', var('source_dataset_name')) }}" + host: "{{ env_var('DLT__CREDENTIALS__SERVER_HOSTNAME') }}" + http_path: "{{ env_var('DLT__CREDENTIALS__HTTP_PATH') }}" + token: "{{ env_var('DLT__CREDENTIALS__ACCESS_TOKEN') }}" + threads: 4 diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 73c8f076d1..3fa8da6aee 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1163,9 +1163,9 @@ def _set_context(self, is_active: bool) -> None: # set destination context on activation if self.destination: # inject capabilities context - self._container[DestinationCapabilitiesContext] = ( - self._get_destination_capabilities() - ) + self._container[ + DestinationCapabilitiesContext + ] = self._get_destination_capabilities() else: # remove destination context on deactivation if DestinationCapabilitiesContext in self._container: diff --git a/docs/website/docs/dlt-ecosystem/destinations/databricks.md b/docs/website/docs/dlt-ecosystem/destinations/databricks.md new file mode 100644 index 0000000000..679988f918 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/destinations/databricks.md @@ -0,0 +1,110 @@ +--- + +title: Databricks +description: Databricks `dlt` destination +keywords: [Databricks, destination, data warehouse] + +--- + +# Databricks + +## Install dlt with Databricks +**To install the DLT library with Databricks dependencies:** +``` +pip install dlt[databricks] +``` + +## Setup Guide + +**1. Initialize a project with a pipeline that loads to Databricks by running** +``` +dlt init chess databricks +``` + +**2. Install the necessary dependencies for Databricks by running** +``` +pip install -r requirements.txt +``` +This will install dlt with **databricks** extra which contains Databricks Python dbapi client. + +**4. Enter your credentials into `.dlt/secrets.toml`.** + +This should have your connection parameters and your personal access token. + +It should now look like: + +```toml +[destination.databricks.credentials] +server_hostname = "MY_DATABRICKS.azuredatabricks.net" +http_path = "/sql/1.0/warehouses/12345" +access_token "MY_ACCESS_TOKEN" +catalog = "my_catalog" +``` + +## Write disposition +All write dispositions are supported + +## Data loading +Data is loaded using `INSERT VALUES` statements by default. + +Efficient loading from a staging filesystem is also supported by configuring an Amazon S3 or Azure Blob Storage bucket as a staging destination. When staging is enabled `dlt` will upload data in `parquet` files to the bucket and then use `COPY INTO` statements to ingest the data into Databricks. +For more information on staging, see the [staging support](#staging-support) section below. + +## Supported file formats +* [insert-values](../file-formats/insert-format.md) is used by default +* [jsonl](../file-formats/jsonl.md) supported when staging is enabled (see limitations below) +* [parquet](../file-formats/parquet.md) supported when staging is enabled + +The `jsonl` format has some limitations when used with Databricks: + +1. Compression must be disabled to load jsonl files in databricks. Set `data_writer.disable_compression` to `true` in dlt config when using this format. +2. The following data types are not supported when using `jsonl` format with `databricks`: `decimal`, `complex`, `date`, `binary`. Use `parquet` if your data contains these types. +3. `bigint` data type with precision is not supported with `jsonl` format + + +## Staging support + +Databricks supports both Amazon S3 and Azure Blob Storage as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there. + +### Databricks and Amazon S3 + +Please refer to the [S3 documentation](./filesystem.md#aws-s3) for details on connecting your s3 bucket with the bucket_url and credentials. + +Example to set up Databricks with s3 as a staging destination: + +```python +import dlt + +# Create a dlt pipeline that will load +# chess player data to the Databricks destination +# via staging on s3 +pipeline = dlt.pipeline( + pipeline_name='chess_pipeline', + destination='databricks', + staging=dlt.destinations.filesystem('s3://your-bucket-name'), # add this to activate the staging location + dataset_name='player_data', +) +``` + +### Databricks and Azure Blob Storage + +Refer to the [Azure Blob Storage filesystem documentation](./filesystem.md#azure-blob-storage) for details on connecting your Azure Blob Storage container with the bucket_url and credentials. + +Example to set up Databricks with Azure as a staging destination: + +```python +# Create a dlt pipeline that will load +# chess player data to the Databricks destination +# via staging on Azure Blob Storage +pipeline = dlt.pipeline( + pipeline_name='chess_pipeline', + destination='databricks', + staging=dlt.destinations.filesystem('az://your-container-name'), # add this to activate the staging location + dataset_name='player_data' +) +``` +### dbt support +This destination [integrates with dbt](../transformations/dbt/dbt.md) via [dbt-databricks](https://github.com/databricks/dbt-databricks) + +### Syncing of `dlt` state +This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination). diff --git a/mypy.ini b/mypy.ini index 8a02cf80bd..d4da898a0f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -104,4 +104,7 @@ ignore_missing_imports=true [mypy-s3fs.*] ignore_missing_imports=true [mypy-win_precise_time] +ignore_missing_imports=true + +[mypy-databricks.*] ignore_missing_imports=true \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index c5da40c604..e8cb1eb1fe 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "about-time" @@ -35,27 +35,26 @@ docs = ["furo", "myst-parser", "numpydoc", "sphinx"] [[package]] name = "agate" -version = "1.6.3" +version = "1.7.1" description = "A data analysis library that is optimized for humans instead of machines." optional = false python-versions = "*" files = [ - {file = "agate-1.6.3-py2.py3-none-any.whl", hash = "sha256:2d568fd68a8eb8b56c805a1299ba4bc30ca0434563be1bea309c9d1c1c8401f4"}, - {file = "agate-1.6.3.tar.gz", hash = "sha256:e0f2f813f7e12311a4cdccc97d6ba0a6781e9c1aa8eca0ab00d5931c0113a308"}, + {file = "agate-1.7.1-py2.py3-none-any.whl", hash = "sha256:23f9f412f74f97b72f82b1525ab235cc816bc8c8525d968a091576a0dbc54a5f"}, + {file = "agate-1.7.1.tar.gz", hash = "sha256:eadf46d980168b8922d5d396d6258eecd5e7dbef7e6f0c0b71e968545ea96389"}, ] [package.dependencies] Babel = ">=2.0" isodate = ">=0.5.4" leather = ">=0.3.2" -parsedatetime = ">=2.1,<2.5 || >2.5,<2.6 || >2.6" +parsedatetime = ">=2.1,<2.5 || >2.5" python-slugify = ">=1.2.1" pytimeparse = ">=1.1.5" -six = ">=1.9.0" [package.extras] docs = ["Sphinx (>=1.2.2)", "sphinx-rtd-theme (>=0.1.6)"] -test = ["PyICU (>=2.4.2)", "coverage (>=3.7.1)", "cssselect (>=0.9.1)", "lxml (>=3.6.0)", "mock (>=1.3.0)", "nose (>=1.1.2)", "pytz (>=2015.4)", "unittest2 (>=1.1.0)"] +test = ["PyICU (>=2.4.2)", "coverage (>=3.7.1)", "cssselect (>=0.9.1)", "lxml (>=3.6.0)", "pytest", "pytest-cov", "pytz (>=2015.4)"] [[package]] name = "aiobotocore" @@ -1886,68 +1885,119 @@ nr-date = ">=2.0.0,<3.0.0" typeapi = ">=2.0.1,<3.0.0" typing-extensions = ">=3.10.0" +[[package]] +name = "databricks-sdk" +version = "0.17.0" +description = "Databricks SDK for Python (Beta)" +optional = true +python-versions = ">=3.7" +files = [ + {file = "databricks-sdk-0.17.0.tar.gz", hash = "sha256:0a1baa6783aba9b034b9a017da8d0cf839ec61ae8318792b78bfb3db0374dd9c"}, + {file = "databricks_sdk-0.17.0-py3-none-any.whl", hash = "sha256:ad90e01c7b1a9d60a3de6a35606c79ac982e8972d3ad3ff89c251c24439c8bb9"}, +] + +[package.dependencies] +google-auth = ">=2.0,<3.0" +requests = ">=2.28.1,<3" + +[package.extras] +dev = ["autoflake", "ipython", "ipywidgets", "isort", "pycodestyle", "pyfakefs", "pytest", "pytest-cov", "pytest-mock", "pytest-xdist", "requests-mock", "wheel", "yapf"] +notebook = ["ipython (>=8,<9)", "ipywidgets (>=8,<9)"] + +[[package]] +name = "databricks-sql-connector" +version = "2.9.3" +description = "Databricks SQL Connector for Python" +optional = true +python-versions = ">=3.7.1,<4.0.0" +files = [ + {file = "databricks_sql_connector-2.9.3-py3-none-any.whl", hash = "sha256:e37b5aa8bea22e84a9920e87ad9ba6cafbe656008c180a790baa53b711dd9889"}, + {file = "databricks_sql_connector-2.9.3.tar.gz", hash = "sha256:09a1686de3470091e78640de276053d4e18f8c03ba3627ed45b368f78bf87db9"}, +] + +[package.dependencies] +alembic = ">=1.0.11,<2.0.0" +lz4 = ">=4.0.2,<5.0.0" +numpy = [ + {version = ">=1.16.6", markers = "python_version >= \"3.7\" and python_version < \"3.11\""}, + {version = ">=1.23.4", markers = "python_version >= \"3.11\""}, +] +oauthlib = ">=3.1.0,<4.0.0" +openpyxl = ">=3.0.10,<4.0.0" +pandas = {version = ">=1.2.5,<3.0.0", markers = "python_version >= \"3.8\""} +pyarrow = [ + {version = ">=6.0.0", markers = "python_version >= \"3.7\" and python_version < \"3.11\""}, + {version = ">=10.0.1", markers = "python_version >= \"3.11\""}, +] +requests = ">=2.18.1,<3.0.0" +sqlalchemy = ">=1.3.24,<2.0.0" +thrift = ">=0.16.0,<0.17.0" +urllib3 = ">=1.0" + [[package]] name = "dbt-athena-community" -version = "1.5.2" +version = "1.7.1" description = "The athena adapter plugin for dbt (data build tool)" optional = true -python-versions = "*" +python-versions = ">=3.8" files = [ - {file = "dbt-athena-community-1.5.2.tar.gz", hash = "sha256:9acd333ddf33514769189a7a0b6219e13966d370098211cb1d022fa32e64671a"}, - {file = "dbt_athena_community-1.5.2-py3-none-any.whl", hash = "sha256:c9f0f8425500211a1c1deddce5aff5ed24fe08530f0ffad38e63de9c9b9f3ee6"}, + {file = "dbt-athena-community-1.7.1.tar.gz", hash = "sha256:02c7bc461628e2adbfaf9d3f51fbe9a5cb5e06ee2ea8329259758518ceafdc12"}, + {file = "dbt_athena_community-1.7.1-py3-none-any.whl", hash = "sha256:2a376fa128e2bd98cb774fcbf718ebe4fbc9cac7857aa037b9e36bec75448361"}, ] [package.dependencies] boto3 = ">=1.26,<2.0" boto3-stubs = {version = ">=1.26,<2.0", extras = ["athena", "glue", "lakeformation", "sts"]} -dbt-core = ">=1.5.0,<1.6.0" +dbt-core = ">=1.7.0,<1.8.0" +mmh3 = ">=4.0.1,<4.1.0" pyathena = ">=2.25,<4.0" pydantic = ">=1.10,<3.0" tenacity = ">=8.2,<9.0" [[package]] name = "dbt-bigquery" -version = "1.5.6" +version = "1.7.2" description = "The Bigquery adapter plugin for dbt" optional = true python-versions = ">=3.8" files = [ - {file = "dbt-bigquery-1.5.6.tar.gz", hash = "sha256:4655cf2ee0acda986b80e6c5d55cae57871bef22d868dfe29d8d4a5bca98a1ba"}, - {file = "dbt_bigquery-1.5.6-py3-none-any.whl", hash = "sha256:3f37544716880cbd17b32bc0c9728a0407b5615b2cd08e1bb904a7a83c46eb6c"}, + {file = "dbt-bigquery-1.7.2.tar.gz", hash = "sha256:27c7f492f65ab5d1d43432a4467a436fc3637e3cb72c5b4ab07ddf7573c43596"}, + {file = "dbt_bigquery-1.7.2-py3-none-any.whl", hash = "sha256:75015755363d9e8b8cebe190d59a5e08375032b37bcfec41ec8753e7dea29f6e"}, ] [package.dependencies] -agate = ">=1.6.3,<1.7.0" -dbt-core = ">=1.5.0,<1.6.0" +dbt-core = ">=1.7.0,<1.8.0" +google-api-core = ">=2.11.0" google-cloud-bigquery = ">=3.0,<4.0" google-cloud-dataproc = ">=5.0,<6.0" google-cloud-storage = ">=2.4,<3.0" [[package]] name = "dbt-core" -version = "1.5.6" +version = "1.7.4" description = "With dbt, data analysts and engineers can build analytics the way engineers build applications." optional = false -python-versions = ">=3.7.2" +python-versions = ">=3.8" files = [ - {file = "dbt-core-1.5.6.tar.gz", hash = "sha256:af3c03cd4a1fc92481362888014ca1ffed2ffef0b0e0d98463ad0f26c49ef458"}, - {file = "dbt_core-1.5.6-py3-none-any.whl", hash = "sha256:030d2179f9efbf8ccea079296d0c79278d963bb2475c0bcce9ca4bbb0d8c393c"}, + {file = "dbt-core-1.7.4.tar.gz", hash = "sha256:769b95949210cb0d1eafdb7be48b01e59984650403f86510fdee65bd0f70f76d"}, + {file = "dbt_core-1.7.4-py3-none-any.whl", hash = "sha256:50050ae44fe9bad63e1b639810ed3629822cdc7a2af0eff6e08461c94c4527c0"}, ] [package.dependencies] -agate = ">=1.6,<1.7.1" +agate = ">=1.7.0,<1.8.0" cffi = ">=1.9,<2.0.0" -click = "<9" -colorama = ">=0.3.9,<0.4.7" -dbt-extractor = ">=0.4.1,<0.5.0" -hologram = ">=0.0.14,<=0.0.16" +click = ">=8.0.2,<9" +colorama = ">=0.3.9,<0.5" +dbt-extractor = ">=0.5.0,<0.6.0" +dbt-semantic-interfaces = ">=0.4.2,<0.5.0" idna = ">=2.5,<4" isodate = ">=0.6,<0.7" -Jinja2 = "3.1.2" +Jinja2 = ">=3.1.2,<3.2.0" +jsonschema = ">=3.0" logbook = ">=1.5,<1.6" -mashumaro = {version = "3.6", extras = ["msgpack"]} -minimal-snowplow-tracker = "0.0.2" -networkx = {version = ">=2.3,<3", markers = "python_version >= \"3.8\""} +mashumaro = {version = ">=3.9,<4.0", extras = ["msgpack"]} +minimal-snowplow-tracker = ">=0.0.2,<0.1.0" +networkx = ">=2.3,<4" packaging = ">20.9" pathspec = ">=0.9,<0.12" protobuf = ">=4.0.0" @@ -1956,99 +2006,160 @@ pyyaml = ">=6.0" requests = "<3.0.0" sqlparse = ">=0.2.3,<0.5" typing-extensions = ">=3.7.4" -werkzeug = ">=1,<3" +urllib3 = ">=1.0,<2.0" + +[[package]] +name = "dbt-databricks" +version = "1.7.3" +description = "The Databricks adapter plugin for dbt" +optional = true +python-versions = ">=3.8" +files = [ + {file = "dbt-databricks-1.7.3.tar.gz", hash = "sha256:045e26240c825342259a59004c2e35e7773b0b6cbb255e6896bd46d3810f9607"}, + {file = "dbt_databricks-1.7.3-py3-none-any.whl", hash = "sha256:7c2b7bd7228a401d8262781749fc496c825fe6050e661e5ab3f1c66343e311cc"}, +] + +[package.dependencies] +databricks-sdk = ">=0.9.0" +databricks-sql-connector = ">=2.9.3,<3.0.0" +dbt-spark = "1.7.1" +keyring = ">=23.13.0" [[package]] name = "dbt-duckdb" -version = "1.5.2" +version = "1.7.1" description = "The duckdb adapter plugin for dbt (data build tool)" optional = false -python-versions = "*" +python-versions = ">=3.8" files = [ - {file = "dbt-duckdb-1.5.2.tar.gz", hash = "sha256:3407216c21bf78fd128dccfcff3ec4bf260fb145e633432015bc7d0f123e8e4b"}, - {file = "dbt_duckdb-1.5.2-py3-none-any.whl", hash = "sha256:5d18254807bbc3e61daf4f360208ad886adf44b8525e1998168290fbe73a5cbb"}, + {file = "dbt-duckdb-1.7.1.tar.gz", hash = "sha256:e59b3e58d7a461988d000892b75ce95245cdf899c847e3a430eb2e9e10e63bb9"}, + {file = "dbt_duckdb-1.7.1-py3-none-any.whl", hash = "sha256:bd75b1a72924b942794d0c3293a1159a01f21ab9d82c9f18b22c253dedad101a"}, ] [package.dependencies] -dbt-core = ">=1.5.0,<1.6.0" -duckdb = ">=0.5.0" +dbt-core = ">=1.7.0,<1.8.0" +duckdb = ">=0.7.0" [package.extras] glue = ["boto3", "mypy-boto3-glue"] [[package]] name = "dbt-extractor" -version = "0.4.1" +version = "0.5.1" description = "A tool to analyze and extract information from Jinja used in dbt projects." optional = false python-versions = ">=3.6.1" files = [ - {file = "dbt_extractor-0.4.1-cp36-abi3-macosx_10_7_x86_64.whl", hash = "sha256:4dc715bd740e418d8dc1dd418fea508e79208a24cf5ab110b0092a3cbe96bf71"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:bc9e0050e3a2f4ea9fe58e8794bc808e6709a0c688ed710fc7c5b6ef3e5623ec"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:76872cdee659075d6ce2df92dc62e59a74ba571be62acab2e297ca478b49d766"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:81435841610be1b07806d72cd89b1956c6e2a84c360b9ceb3f949c62a546d569"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:7c291f9f483eae4f60dd5859097d7ba51d5cb6c4725f08973ebd18cdea89d758"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:822b1e911db230e1b9701c99896578e711232001027b518c44c32f79a46fa3f9"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:554d27741a54599c39e5c0b7dbcab77400d83f908caba284a3e960db812e5814"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:a805d51a25317f53cbff951c79b9cf75421cf48e4b3e1dfb3e9e8de6d824b76c"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:cad90ddc708cb4182dc16fe2c87b1f088a1679877b93e641af068eb68a25d582"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:34783d788b133f223844e280e37b3f5244f2fb60acc457aa75c2667e418d5442"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-musllinux_1_2_armv7l.whl", hash = "sha256:9da211869a1220ea55c5552c1567a3ea5233a6c52fa89ca87a22465481c37bc9"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-musllinux_1_2_i686.whl", hash = "sha256:7d7c47774dc051b8c18690281a55e2e3d3320e823b17e04b06bc3ff81b1874ba"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:037907a7c7ae0391045d81338ca77ddaef899a91d80f09958f09fe374594e19b"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-win32.whl", hash = "sha256:3fe8d8e28a7bd3e0884896147269ca0202ca432d8733113386bdc84c824561bf"}, - {file = "dbt_extractor-0.4.1-cp36-abi3-win_amd64.whl", hash = "sha256:35265a0ae0a250623b0c2e3308b2738dc8212e40e0aa88407849e9ea090bb312"}, - {file = "dbt_extractor-0.4.1.tar.gz", hash = "sha256:75b1c665699ec0f1ffce1ba3d776f7dfce802156f22e70a7b9c8f0b4d7e80f42"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:3b91e6106b967d908b34f83929d3f50ee2b498876a1be9c055fe060ed728c556"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-macosx_10_12_x86_64.whl", hash = "sha256:3614ce9f83ae4cd0dc95f77730034a793a1c090a52dcf698ba1c94050afe3a8b"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:ea4edf33035d0a060b1e01c42fb2d99316457d44c954d6ed4eed9f1948664d87"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d3b9bf50eb062b4344d9546fe42038996c6e7e7daa10724aa955d64717260e5d"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:c0ce901d4ebf0664977e4e1cbf596d4afc6c1339fcc7d2cf67ce3481566a626f"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:cbe338b76e9ffaa18275456e041af56c21bb517f6fbda7a58308138703da0996"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1b25fa7a276ab26aa2d70ff6e0cf4cfb1490d7831fb57ee1337c24d2b0333b84"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c5651e458be910ff567c0da3ea2eb084fd01884cc88888ac2cf1e240dcddacc2"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:62e4f040fd338b652683421ce48e903812e27fd6e7af58b1b70a4e1f9f2c79e3"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:91e25ad78f1f4feadd27587ebbcc46ad909cfad843118908f30336d08d8400ca"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-musllinux_1_2_armv7l.whl", hash = "sha256:cdf9938b36cd098bcdd80f43dc03864da3f69f57d903a9160a32236540d4ddcd"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-musllinux_1_2_i686.whl", hash = "sha256:475e2c05b17eb4976eff6c8f7635be42bec33f15a74ceb87a40242c94a99cebf"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:100453ba06e169cbdb118234ab3f06f6722a2e0e316089b81c88dea701212abc"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-win32.whl", hash = "sha256:6916aae085fd5f2af069fd6947933e78b742c9e3d2165e1740c2e28ae543309a"}, + {file = "dbt_extractor-0.5.1-cp38-abi3-win_amd64.whl", hash = "sha256:eecc08f3743e802a8ede60c89f7b2bce872acc86120cbc0ae7df229bb8a95083"}, + {file = "dbt_extractor-0.5.1.tar.gz", hash = "sha256:cd5d95576a8dea4190240aaf9936a37fd74b4b7913ca69a3c368fc4472bb7e13"}, ] [[package]] name = "dbt-postgres" -version = "1.5.6" +version = "1.7.4" description = "The postgres adapter plugin for dbt (data build tool)" optional = true -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "dbt-postgres-1.5.6.tar.gz", hash = "sha256:b74e471dc661819a3d4bda2d11497935661ac2e25786c8a5b7314d8241b18582"}, - {file = "dbt_postgres-1.5.6-py3-none-any.whl", hash = "sha256:bc5711c9ab0ec4b57ab814b2c4e4c973554c8374b7da94b06814ac81c91f67ef"}, + {file = "dbt-postgres-1.7.4.tar.gz", hash = "sha256:16185b8de36d1a2052a2e4b85512306ab55085b1ea323a353d0dc3628473208d"}, + {file = "dbt_postgres-1.7.4-py3-none-any.whl", hash = "sha256:d414b070ca5e48925ea9ab12706bbb9e2294f7d4509c28e7af42268596334044"}, ] [package.dependencies] -dbt-core = "1.5.6" +agate = "*" +dbt-core = "1.7.4" psycopg2-binary = ">=2.8,<3.0" [[package]] name = "dbt-redshift" -version = "1.5.10" +version = "1.7.1" description = "The Redshift adapter plugin for dbt" optional = true python-versions = ">=3.8" files = [ - {file = "dbt-redshift-1.5.10.tar.gz", hash = "sha256:2b9ae1a7d05349e208b0937cd7cc920ea427341ef96096021b18e4070e927f5c"}, - {file = "dbt_redshift-1.5.10-py3-none-any.whl", hash = "sha256:b7689b043535b6b0d217c2abfe924db2336beaae71f3f36ab9aa1e920d2bb2e0"}, + {file = "dbt-redshift-1.7.1.tar.gz", hash = "sha256:6da69a83038d011570d131b85171842d0858a46bca3757419ae193b5724a2119"}, + {file = "dbt_redshift-1.7.1-py3-none-any.whl", hash = "sha256:2a48b9424934f5445e4285740ebe512afaa75882138121536ccc21d027ef62f2"}, ] [package.dependencies] agate = "*" -boto3 = ">=1.26.157,<1.27.0" -dbt-core = ">=1.5.0,<1.6.0" -dbt-postgres = ">=1.5.0,<1.6.0" -redshift-connector = "2.0.913" +dbt-core = ">=1.7.0,<1.8.0" +dbt-postgres = ">=1.7.0,<1.8.0" +redshift-connector = "2.0.915" + +[[package]] +name = "dbt-semantic-interfaces" +version = "0.4.3" +description = "The shared semantic layer definitions that dbt-core and MetricFlow use" +optional = false +python-versions = ">=3.8" +files = [ + {file = "dbt_semantic_interfaces-0.4.3-py3-none-any.whl", hash = "sha256:af6ab8509da81ae5f5f1d5631c9761cccaed8cd5311d4824a8d4168ecd0f2093"}, + {file = "dbt_semantic_interfaces-0.4.3.tar.gz", hash = "sha256:9a46d07ad022a4c48783565a776ebc6f1d19e0412e70c4759bc9d7bba461ea1c"}, +] + +[package.dependencies] +click = ">=7.0,<9.0" +importlib-metadata = ">=6.0,<7.0" +jinja2 = ">=3.0,<4.0" +jsonschema = ">=4.0,<5.0" +more-itertools = ">=8.0,<11.0" +pydantic = ">=1.10,<3" +python-dateutil = ">=2.0,<3.0" +pyyaml = ">=6.0,<7.0" +typing-extensions = ">=4.4,<5.0" [[package]] name = "dbt-snowflake" -version = "1.5.3" +version = "1.7.1" description = "The Snowflake adapter plugin for dbt" optional = true python-versions = ">=3.8" files = [ - {file = "dbt-snowflake-1.5.3.tar.gz", hash = "sha256:cf42772d2c2f1e29a2a64b039c66d80a8593f52a2dd711a144d43b4175802f9a"}, - {file = "dbt_snowflake-1.5.3-py3-none-any.whl", hash = "sha256:8aaa939d834798e5bb10a3ba4f52fc32a53e6e5568d6c0e8b3ac644f099972ff"}, + {file = "dbt-snowflake-1.7.1.tar.gz", hash = "sha256:842a9e87b9e2d999e3bc27aaa369398a4d02bb3f8bb7447aa6151204d4eb90f0"}, + {file = "dbt_snowflake-1.7.1-py3-none-any.whl", hash = "sha256:32ef8733f67dcf4eb594d1b80852ef0b67e920f25bb8a2953031a3868a8d2b3e"}, ] [package.dependencies] -dbt-core = ">=1.5.0,<1.6.0" +agate = "*" +dbt-core = ">=1.7.0,<1.8.0" snowflake-connector-python = {version = ">=3.0,<4.0", extras = ["secure-local-storage"]} +[[package]] +name = "dbt-spark" +version = "1.7.1" +description = "The Apache Spark adapter plugin for dbt" +optional = true +python-versions = ">=3.8" +files = [ + {file = "dbt-spark-1.7.1.tar.gz", hash = "sha256:a10e5d1bfdb2ca98e7ae2badd06150e2695d9d4fa18ae2354ed5bd093d77f947"}, + {file = "dbt_spark-1.7.1-py3-none-any.whl", hash = "sha256:99b5002edcdb82058a3b0ad33eb18b91a4bdde887d94855e8bd6f633d78837dc"}, +] + +[package.dependencies] +dbt-core = ">=1.7.0,<1.8.0" +sqlparams = ">=3.0.0" + +[package.extras] +all = ["PyHive[hive-pure-sasl] (>=0.7.0,<0.8.0)", "pyodbc (>=4.0.39,<4.1.0)", "pyspark (>=3.0.0,<4.0.0)", "thrift (>=0.11.0,<0.17.0)"] +odbc = ["pyodbc (>=4.0.39,<4.1.0)"] +pyhive = ["PyHive[hive-pure-sasl] (>=0.7.0,<0.8.0)", "thrift (>=0.11.0,<0.17.0)"] +session = ["pyspark (>=3.0.0,<4.0.0)"] + [[package]] name = "decopatch" version = "1.4.10" @@ -2298,6 +2409,17 @@ files = [ blessed = ">=1.17.7" prefixed = ">=0.3.2" +[[package]] +name = "et-xmlfile" +version = "1.1.0" +description = "An implementation of lxml.xmlfile for the standard library" +optional = true +python-versions = ">=3.6" +files = [ + {file = "et_xmlfile-1.1.0-py3-none-any.whl", hash = "sha256:a2ba85d1d6a74ef63837eed693bcb89c3f752169b0e3e7ae5b16ca5e1b3deada"}, + {file = "et_xmlfile-1.1.0.tar.gz", hash = "sha256:8eb9e2bc2f8c97e37a2dc85a09ecdcdec9d8a396530a6d5a33b30b9a92da0c5c"}, +] + [[package]] name = "exceptiongroup" version = "1.1.3" @@ -3639,21 +3761,6 @@ doc = ["sphinx (>=5.0.0)", "sphinx-rtd-theme (>=1.0.0)", "towncrier (>=21,<22)"] lint = ["black (>=22)", "flake8 (==6.0.0)", "flake8-bugbear (==23.3.23)", "isort (>=5.10.1)", "mypy (==0.971)", "pydocstyle (>=5.0.0)"] test = ["eth-utils (>=1.0.1,<3)", "hypothesis (>=3.44.24,<=6.31.6)", "pytest (>=7.0.0)", "pytest-xdist (>=2.4.0)"] -[[package]] -name = "hologram" -version = "0.0.16" -description = "JSON schema generation from dataclasses" -optional = false -python-versions = "*" -files = [ - {file = "hologram-0.0.16-py3-none-any.whl", hash = "sha256:4e56bd525336bb64a18916f871977a4125b64be8aaa750233583003333cda361"}, - {file = "hologram-0.0.16.tar.gz", hash = "sha256:1c2c921b4e575361623ea0e0d0aa5aee377b1a333cc6c6a879e213ed34583e55"}, -] - -[package.dependencies] -jsonschema = ">=3.0" -python-dateutil = ">=2.8,<2.9" - [[package]] name = "hpack" version = "4.0.0" @@ -3776,22 +3883,22 @@ files = [ [[package]] name = "importlib-metadata" -version = "4.13.0" +version = "6.11.0" description = "Read metadata from Python packages" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "importlib_metadata-4.13.0-py3-none-any.whl", hash = "sha256:8a8a81bcf996e74fee46f0d16bd3eaa382a7eb20fd82445c3ad11f4090334116"}, - {file = "importlib_metadata-4.13.0.tar.gz", hash = "sha256:dd0173e8f150d6815e098fd354f6414b0f079af4644ddfe90c71e2fc6174346d"}, + {file = "importlib_metadata-6.11.0-py3-none-any.whl", hash = "sha256:f0afba6205ad8f8947c7d338b5342d5db2afbfd82f9cbef7879a9539cc12eb9b"}, + {file = "importlib_metadata-6.11.0.tar.gz", hash = "sha256:1231cf92d825c9e03cfc4da076a16de6422c863558229ea0b22b675657463443"}, ] [package.dependencies] zipp = ">=0.5" [package.extras] -docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)"] +docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-lint"] perf = ["ipython"] -testing = ["flake8 (<5)", "flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)"] +testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)", "pytest-ruff"] [[package]] name = "importlib-resources" @@ -4284,6 +4391,56 @@ html5 = ["html5lib"] htmlsoup = ["BeautifulSoup4"] source = ["Cython (>=0.29.35)"] +[[package]] +name = "lz4" +version = "4.3.3" +description = "LZ4 Bindings for Python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "lz4-4.3.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b891880c187e96339474af2a3b2bfb11a8e4732ff5034be919aa9029484cd201"}, + {file = "lz4-4.3.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:222a7e35137d7539c9c33bb53fcbb26510c5748779364014235afc62b0ec797f"}, + {file = "lz4-4.3.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f76176492ff082657ada0d0f10c794b6da5800249ef1692b35cf49b1e93e8ef7"}, + {file = "lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1d18718f9d78182c6b60f568c9a9cec8a7204d7cb6fad4e511a2ef279e4cb05"}, + {file = "lz4-4.3.3-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6cdc60e21ec70266947a48839b437d46025076eb4b12c76bd47f8e5eb8a75dcc"}, + {file = "lz4-4.3.3-cp310-cp310-win32.whl", hash = "sha256:c81703b12475da73a5d66618856d04b1307e43428a7e59d98cfe5a5d608a74c6"}, + {file = "lz4-4.3.3-cp310-cp310-win_amd64.whl", hash = "sha256:43cf03059c0f941b772c8aeb42a0813d68d7081c009542301637e5782f8a33e2"}, + {file = "lz4-4.3.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:30e8c20b8857adef7be045c65f47ab1e2c4fabba86a9fa9a997d7674a31ea6b6"}, + {file = "lz4-4.3.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2f7b1839f795315e480fb87d9bc60b186a98e3e5d17203c6e757611ef7dcef61"}, + {file = "lz4-4.3.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:edfd858985c23523f4e5a7526ca6ee65ff930207a7ec8a8f57a01eae506aaee7"}, + {file = "lz4-4.3.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e9c410b11a31dbdc94c05ac3c480cb4b222460faf9231f12538d0074e56c563"}, + {file = "lz4-4.3.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d2507ee9c99dbddd191c86f0e0c8b724c76d26b0602db9ea23232304382e1f21"}, + {file = "lz4-4.3.3-cp311-cp311-win32.whl", hash = "sha256:f180904f33bdd1e92967923a43c22899e303906d19b2cf8bb547db6653ea6e7d"}, + {file = "lz4-4.3.3-cp311-cp311-win_amd64.whl", hash = "sha256:b14d948e6dce389f9a7afc666d60dd1e35fa2138a8ec5306d30cd2e30d36b40c"}, + {file = "lz4-4.3.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e36cd7b9d4d920d3bfc2369840da506fa68258f7bb176b8743189793c055e43d"}, + {file = "lz4-4.3.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:31ea4be9d0059c00b2572d700bf2c1bc82f241f2c3282034a759c9a4d6ca4dc2"}, + {file = "lz4-4.3.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c9a6fd20767ccaf70649982f8f3eeb0884035c150c0b818ea660152cf3c809"}, + {file = "lz4-4.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bca8fccc15e3add173da91be8f34121578dc777711ffd98d399be35487c934bf"}, + {file = "lz4-4.3.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e7d84b479ddf39fe3ea05387f10b779155fc0990125f4fb35d636114e1c63a2e"}, + {file = "lz4-4.3.3-cp312-cp312-win32.whl", hash = "sha256:337cb94488a1b060ef1685187d6ad4ba8bc61d26d631d7ba909ee984ea736be1"}, + {file = "lz4-4.3.3-cp312-cp312-win_amd64.whl", hash = "sha256:5d35533bf2cee56f38ced91f766cd0038b6abf46f438a80d50c52750088be93f"}, + {file = "lz4-4.3.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:363ab65bf31338eb364062a15f302fc0fab0a49426051429866d71c793c23394"}, + {file = "lz4-4.3.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0a136e44a16fc98b1abc404fbabf7f1fada2bdab6a7e970974fb81cf55b636d0"}, + {file = "lz4-4.3.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:abc197e4aca8b63f5ae200af03eb95fb4b5055a8f990079b5bdf042f568469dd"}, + {file = "lz4-4.3.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:56f4fe9c6327adb97406f27a66420b22ce02d71a5c365c48d6b656b4aaeb7775"}, + {file = "lz4-4.3.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0e822cd7644995d9ba248cb4b67859701748a93e2ab7fc9bc18c599a52e4604"}, + {file = "lz4-4.3.3-cp38-cp38-win32.whl", hash = "sha256:24b3206de56b7a537eda3a8123c644a2b7bf111f0af53bc14bed90ce5562d1aa"}, + {file = "lz4-4.3.3-cp38-cp38-win_amd64.whl", hash = "sha256:b47839b53956e2737229d70714f1d75f33e8ac26e52c267f0197b3189ca6de24"}, + {file = "lz4-4.3.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:6756212507405f270b66b3ff7f564618de0606395c0fe10a7ae2ffcbbe0b1fba"}, + {file = "lz4-4.3.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ee9ff50557a942d187ec85462bb0960207e7ec5b19b3b48949263993771c6205"}, + {file = "lz4-4.3.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2b901c7784caac9a1ded4555258207d9e9697e746cc8532129f150ffe1f6ba0d"}, + {file = "lz4-4.3.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b6d9ec061b9eca86e4dcc003d93334b95d53909afd5a32c6e4f222157b50c071"}, + {file = "lz4-4.3.3-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f4c7bf687303ca47d69f9f0133274958fd672efaa33fb5bcde467862d6c621f0"}, + {file = "lz4-4.3.3-cp39-cp39-win32.whl", hash = "sha256:054b4631a355606e99a42396f5db4d22046a3397ffc3269a348ec41eaebd69d2"}, + {file = "lz4-4.3.3-cp39-cp39-win_amd64.whl", hash = "sha256:eac9af361e0d98335a02ff12fb56caeb7ea1196cf1a49dbf6f17828a131da807"}, + {file = "lz4-4.3.3.tar.gz", hash = "sha256:01fe674ef2889dbb9899d8a67361e0c4a2c833af5aeb37dd505727cf5d2a131e"}, +] + +[package.extras] +docs = ["sphinx (>=1.6.0)", "sphinx-bootstrap-theme"] +flake8 = ["flake8"] +tests = ["psutil", "pytest (!=3.3.0)", "pytest-cov"] + [[package]] name = "makefun" version = "1.15.1" @@ -4487,13 +4644,13 @@ tests = ["pytest", "pytest-lazy-fixture"] [[package]] name = "mashumaro" -version = "3.6" -description = "Fast serialization library on top of dataclasses" +version = "3.11" +description = "Fast and well tested serialization library" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "mashumaro-3.6-py3-none-any.whl", hash = "sha256:77403e3e2ecd0a7d0e22d472c08e33282460e48726eabe356c5163efbdf9c7ee"}, - {file = "mashumaro-3.6.tar.gz", hash = "sha256:ceb3de53029219bbbb0385ca600b59348dcd14e0c68523986c6d51889ad338f5"}, + {file = "mashumaro-3.11-py3-none-any.whl", hash = "sha256:8f858bdb33790db6d9f3087dce793a26d109aeae38bed3ca9c2d7f16f19db412"}, + {file = "mashumaro-3.11.tar.gz", hash = "sha256:b0b2443be4bdad29bb209d91fe4a2a918fbd7b63cccfeb457c7eeb567db02f5e"}, ] [package.dependencies] @@ -4561,11 +4718,87 @@ files = [ requests = ">=2.2.1,<3.0" six = ">=1.9.0,<2.0" +[[package]] +name = "mmh3" +version = "4.0.1" +description = "Python extension for MurmurHash (MurmurHash3), a set of fast and robust hash functions." +optional = true +python-versions = "*" +files = [ + {file = "mmh3-4.0.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:b719ba87232749095011d567a36a25e40ed029fc61c47e74a12416d8bb60b311"}, + {file = "mmh3-4.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f0ad423711c5096cf4a346011f3b3ec763208e4f4cc4b10ed41cad2a03dbfaed"}, + {file = "mmh3-4.0.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:80918e3f8ab6b717af0a388c14ffac5a89c15d827ff008c1ef545b8b32724116"}, + {file = "mmh3-4.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8222cd5f147defa1355b4042d590c34cef9b2bb173a159fcb72cda204061a4ac"}, + {file = "mmh3-4.0.1-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3821bcd1961ef19247c78c5d01b5a759de82ab0c023e2ff1d5ceed74322fa018"}, + {file = "mmh3-4.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:59f7ed28c24249a54665f1ed3f6c7c1c56618473381080f79bcc0bd1d1db2e4a"}, + {file = "mmh3-4.0.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dacd8d07d4b9be8f0cb6e8fd9a08fc237c18578cf8d42370ee8af2f5a2bf1967"}, + {file = "mmh3-4.0.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5cd00883ef6bcf7831026ce42e773a4b2a4f3a7bf9003a4e781fecb1144b06c1"}, + {file = "mmh3-4.0.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:df73d1c7f0c50c0f8061cd349968fd9dcc6a9e7592d1c834fa898f9c98f8dd7e"}, + {file = "mmh3-4.0.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:f41eeae98f15af0a4ba2a92bce11d8505b612012af664a7634bbfdba7096f5fc"}, + {file = "mmh3-4.0.1-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:ce9bb622e9f1162cafd033071b32ac495c5e8d5863fca2a5144c092a0f129a5b"}, + {file = "mmh3-4.0.1-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:dd92e0ff9edee6af960d9862a3e519d651e6344321fd280fb082654fc96ecc4d"}, + {file = "mmh3-4.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:1aefa8ac8c8fc8ad93365477baef2125dbfd7235880a9c47dca2c46a0af49ef7"}, + {file = "mmh3-4.0.1-cp310-cp310-win32.whl", hash = "sha256:a076ea30ec279a63f44f4c203e4547b5710d00581165fed12583d2017139468d"}, + {file = "mmh3-4.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:5aa1e87e448ee1ffa3737b72f2fe3f5960159ab75bbac2f49dca6fb9797132f6"}, + {file = "mmh3-4.0.1-cp310-cp310-win_arm64.whl", hash = "sha256:45155ff2f291c3a1503d1c93e539ab025a13fd8b3f2868650140702b8bd7bfc2"}, + {file = "mmh3-4.0.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:91f81d6dd4d0c3b4235b4a58a545493c946669c751a2e0f15084171dc2d81fee"}, + {file = "mmh3-4.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:bbfddaf55207798f5b29341e5b3a24dbff91711c51b1665eabc9d910255a78f0"}, + {file = "mmh3-4.0.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:0deb8e19121c0896fdc709209aceda30a367cda47f4a884fcbe56223dbf9e867"}, + {file = "mmh3-4.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:df468ac7b61ec7251d7499e27102899ca39d87686f659baf47f84323f8f4541f"}, + {file = "mmh3-4.0.1-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:84936c113814c6ef3bc4bd3d54f538d7ba312d1d0c2441ac35fdd7d5221c60f6"}, + {file = "mmh3-4.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:8b1df3cf5ce5786aa093f45462118d87ff485f0d69699cdc34f6289b1e833632"}, + {file = "mmh3-4.0.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:da281aa740aa9e7f9bebb879c1de0ea9366687ece5930f9f5027e7c87d018153"}, + {file = "mmh3-4.0.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0ec380933a56eb9fea16d7fcd49f1b5a5c92d7d2b86f25e9a845b72758ee8c42"}, + {file = "mmh3-4.0.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:2fa905fcec8a30e1c0ef522afae1d6170c4f08e6a88010a582f67c59209fb7c7"}, + {file = "mmh3-4.0.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:9b23a06315a65ef0b78da0be32409cfce0d6d83e51d70dcebd3302a61e4d34ce"}, + {file = "mmh3-4.0.1-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:36c27089b12026db14be594d750f7ea6d5d785713b40a971b063f033f5354a74"}, + {file = "mmh3-4.0.1-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:6338341ae6fa5eaa46f69ed9ac3e34e8eecad187b211a6e552e0d8128c568eb1"}, + {file = "mmh3-4.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:1aece29e27d0c8fb489d00bb712fba18b4dd10e39c9aec2e216c779ae6400b8f"}, + {file = "mmh3-4.0.1-cp311-cp311-win32.whl", hash = "sha256:2733e2160c142eed359e25e5529915964a693f0d043165b53933f904a731c1b3"}, + {file = "mmh3-4.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:09f9f643e0b7f8d98473efdfcdb155105824a38a1ada374625b84c1208197a9b"}, + {file = "mmh3-4.0.1-cp311-cp311-win_arm64.whl", hash = "sha256:d93422f38bc9c4d808c5438a011b769935a87df92ce277e9e22b6ec0ae8ed2e2"}, + {file = "mmh3-4.0.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:41013c033dc446d3bfb573621b8b53223adcfcf07be1da0bcbe166d930276882"}, + {file = "mmh3-4.0.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:be46540eac024dd8d9b82899d35b2f23592d3d3850845aba6f10e6127d93246b"}, + {file = "mmh3-4.0.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0e64114b30c6c1e30f8201433b5fa6108a74a5d6f1a14af1b041360c0dd056aa"}, + {file = "mmh3-4.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:275637ecca755565e3b0505d3ecf8e1e0a51eb6a3cbe6e212ed40943f92f98cd"}, + {file = "mmh3-4.0.1-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:955178c8e8d3bc9ad18eab443af670cd13fe18a6b2dba16db2a2a0632be8a133"}, + {file = "mmh3-4.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:750afe0477e0c17904611045ad311ff10bc6c2ec5f5ddc5dd949a2b9bf71d5d5"}, + {file = "mmh3-4.0.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0b7c18c35e9d6a59d6c5f94a6576f800ff2b500e41cd152ecfc7bb4330f32ba2"}, + {file = "mmh3-4.0.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5b8635b1fc6b25d93458472c5d682a1a4b9e6c53e7f4ca75d2bf2a18fa9363ae"}, + {file = "mmh3-4.0.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:057b8de47adee8ad0f2e194ffa445b9845263c1c367ddb335e9ae19c011b25cc"}, + {file = "mmh3-4.0.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:78c0ee0197cfc912f57172aa16e784ad55b533e2e2e91b3a65188cc66fbb1b6e"}, + {file = "mmh3-4.0.1-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:d6acb15137467592691e41e6f897db1d2823ff3283111e316aa931ac0b5a5709"}, + {file = "mmh3-4.0.1-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:f91b2598e1f25e013da070ff641a29ebda76292d3a7bdd20ef1736e9baf0de67"}, + {file = "mmh3-4.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:a78f6f2592395321e2f0dc6b618773398b2c9b15becb419364e0960df53e9f04"}, + {file = "mmh3-4.0.1-cp38-cp38-win32.whl", hash = "sha256:d8650982d0b70af24700bd32b15fab33bb3ef9be4af411100f4960a938b0dd0f"}, + {file = "mmh3-4.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:2489949c7261870a02eeaa2ec7b966881c1775df847c8ce6ea4de3e9d96b5f4f"}, + {file = "mmh3-4.0.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:dcd03a4bb0fa3db03648d26fb221768862f089b6aec5272f0df782a8b4fe5b5b"}, + {file = "mmh3-4.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:3775fb0cc675977e5b506b12b8f23cd220be3d4c2d4db7df81f03c9f61baa4cc"}, + {file = "mmh3-4.0.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8f250f78328d41cdf73d3ad9809359636f4fb7a846d7a6586e1a0f0d2f5f2590"}, + {file = "mmh3-4.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4161009c9077d5ebf8b472dbf0f41b9139b3d380e0bbe71bf9b503efb2965584"}, + {file = "mmh3-4.0.1-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2cf986ebf530717fefeee8d0decbf3f359812caebba985e2c8885c0ce7c2ee4e"}, + {file = "mmh3-4.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3b55741ed51e928b1eec94a119e003fa3bc0139f4f9802e19bea3af03f7dd55a"}, + {file = "mmh3-4.0.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d8250375641b8c5ce5d56a00c6bb29f583516389b8bde0023181d5eba8aa4119"}, + {file = "mmh3-4.0.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:29373e802bc094ffd490e39047bac372ac893c0f411dac3223ef11775e34acd0"}, + {file = "mmh3-4.0.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:071ba41e56f5c385d13ee84b288ccaf46b70cd9e9a6d8cbcbe0964dee68c0019"}, + {file = "mmh3-4.0.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:909e0b88d2c6285481fa6895c2a0faf6384e1b0093f72791aa57d1e04f4adc65"}, + {file = "mmh3-4.0.1-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:51d356f4380f9d9c2a0612156c3d1e7359933991e84a19304440aa04fd723e68"}, + {file = "mmh3-4.0.1-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:c4b2549949efa63d8decb6572f7e75fad4f2375d52fafced674323239dd9812d"}, + {file = "mmh3-4.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:9bcc7b32a89c4e5c6fdef97d82e8087ba26a20c25b4aaf0723abd0b302525934"}, + {file = "mmh3-4.0.1-cp39-cp39-win32.whl", hash = "sha256:8edee21ae4f4337fb970810ef5a263e5d2212b85daca0d39daf995e13380e908"}, + {file = "mmh3-4.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:8cbb6f90f08952fcc90dbf08f0310fdf4d61096c5cb7db8adf03e23f3b857ae5"}, + {file = "mmh3-4.0.1-cp39-cp39-win_arm64.whl", hash = "sha256:ce71856cbca9d7c74d084eeee1bc5b126ed197c1c9530a4fdb994d099b9bc4db"}, + {file = "mmh3-4.0.1.tar.gz", hash = "sha256:ad8be695dc4e44a79631748ba5562d803f0ac42d36a6b97a53aca84a70809385"}, +] + +[package.extras] +test = ["mypy (>=1.0)", "pytest (>=7.0.0)"] + [[package]] name = "more-itertools" version = "10.1.0" description = "More routines for operating on iterables, beyond itertools" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "more-itertools-10.1.0.tar.gz", hash = "sha256:626c369fa0eb37bac0291bce8259b332fd59ac792fa5497b59837309cd5b114a"}, @@ -5140,6 +5373,20 @@ packaging = "*" protobuf = "*" sympy = "*" +[[package]] +name = "openpyxl" +version = "3.1.2" +description = "A Python library to read/write Excel 2010 xlsx/xlsm files" +optional = true +python-versions = ">=3.6" +files = [ + {file = "openpyxl-3.1.2-py2.py3-none-any.whl", hash = "sha256:f91456ead12ab3c6c2e9491cf33ba6d08357d802192379bb482f1033ade496f5"}, + {file = "openpyxl-3.1.2.tar.gz", hash = "sha256:a6f5977418eff3b2d5500d54d9db50c8277a368436f4e4f8ddb1be3422870184"}, +] + +[package.dependencies] +et-xmlfile = "*" + [[package]] name = "opentelemetry-api" version = "1.15.0" @@ -6643,12 +6890,12 @@ fastembed = ["fastembed (==0.1.1)"] [[package]] name = "redshift-connector" -version = "2.0.913" +version = "2.0.915" description = "Redshift interface library" optional = true python-versions = ">=3.6" files = [ - {file = "redshift_connector-2.0.913-py3-none-any.whl", hash = "sha256:bd70395c5b7ec9fcae9565daff6bcb88c7d3ea6182dafba2bac6138f68d00582"}, + {file = "redshift_connector-2.0.915-py3-none-any.whl", hash = "sha256:d02e8d6fa01dd46504c879953f6abd7fa72980edd1e6a80202448fe35fb4c9e4"}, ] [package.dependencies] @@ -7605,6 +7852,17 @@ toml = {version = "*", markers = "python_version < \"3.11\""} tqdm = "*" typing-extensions = "*" +[[package]] +name = "sqlparams" +version = "6.0.1" +description = "Convert between various DB API 2.0 parameter styles." +optional = true +python-versions = ">=3.8" +files = [ + {file = "sqlparams-6.0.1-py3-none-any.whl", hash = "sha256:566651376315c832876be4a0f58ffa23a23fab257d77ee492bdf8d301e169d0d"}, + {file = "sqlparams-6.0.1.tar.gz", hash = "sha256:032b2f949d4afbcbfa24003f6fb407f2fc8468184e3d8ca3d59ba6b30d4935bf"}, +] + [[package]] name = "sqlparse" version = "0.4.4" @@ -7713,6 +7971,24 @@ files = [ {file = "text_unidecode-1.3-py2.py3-none-any.whl", hash = "sha256:1311f10e8b895935241623731c2ba64f4c455287888b18189350b67134a822e8"}, ] +[[package]] +name = "thrift" +version = "0.16.0" +description = "Python bindings for the Apache Thrift RPC system" +optional = true +python-versions = "*" +files = [ + {file = "thrift-0.16.0.tar.gz", hash = "sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408"}, +] + +[package.dependencies] +six = ">=1.7.2" + +[package.extras] +all = ["tornado (>=4.0)", "twisted"] +tornado = ["tornado (>=4.0)"] +twisted = ["twisted"] + [[package]] name = "tokenizers" version = "0.13.3" @@ -8453,7 +8729,8 @@ athena = ["botocore", "pyarrow", "pyathena", "s3fs"] az = ["adlfs"] bigquery = ["gcsfs", "google-cloud-bigquery", "grpcio", "pyarrow"] cli = ["cron-descriptor", "pipdeptree"] -dbt = ["dbt-athena-community", "dbt-bigquery", "dbt-core", "dbt-duckdb", "dbt-redshift", "dbt-snowflake"] +databricks = ["databricks-sql-connector"] +dbt = ["dbt-athena-community", "dbt-bigquery", "dbt-core", "dbt-databricks", "dbt-duckdb", "dbt-redshift", "dbt-snowflake"] duckdb = ["duckdb"] filesystem = ["botocore", "s3fs"] gcp = ["gcsfs", "google-cloud-bigquery", "grpcio"] @@ -8471,4 +8748,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.13" -content-hash = "cf751b2e1e9c66efde0a11774b5204e3206a14fd04ba4c79b2d37e38db5367ad" +content-hash = "2423c6a16d547ee9ab26d59d9fad49fe35fb3e1b85b7c95d82ab33efabb184f6" diff --git a/pyproject.toml b/pyproject.toml index 6436ec23a7..a8d63f7e6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,9 +76,11 @@ weaviate-client = {version = ">=3.22", optional = true} adlfs = {version = ">=2022.4.0", optional = true} pyodbc = {version = "^4.0.39", optional = true} qdrant-client = {version = "^1.6.4", optional = true, extras = ["fastembed"]} +databricks-sql-connector = {version = ">=2.9.3,<3.0.0", optional = true} +dbt-databricks = {version = "^1.7.3", optional = true} [tool.poetry.extras] -dbt = ["dbt-core", "dbt-redshift", "dbt-bigquery", "dbt-duckdb", "dbt-snowflake", "dbt-athena-community"] +dbt = ["dbt-core", "dbt-redshift", "dbt-bigquery", "dbt-duckdb", "dbt-snowflake", "dbt-athena-community", "dbt-databricks"] gcp = ["grpcio", "google-cloud-bigquery", "db-dtypes", "gcsfs"] # bigquery is alias on gcp extras bigquery = ["grpcio", "google-cloud-bigquery", "pyarrow", "db-dtypes", "gcsfs"] @@ -97,6 +99,7 @@ athena = ["pyathena", "pyarrow", "s3fs", "botocore"] weaviate = ["weaviate-client"] mssql = ["pyodbc"] qdrant = ["qdrant-client"] +databricks = ["databricks-sql-connector"] [tool.poetry.scripts] dlt = "dlt.cli._dlt:_main" diff --git a/tests/load/databricks/__init__.py b/tests/load/databricks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/load/databricks/test_databricks_configuration.py b/tests/load/databricks/test_databricks_configuration.py new file mode 100644 index 0000000000..9127e39be4 --- /dev/null +++ b/tests/load/databricks/test_databricks_configuration.py @@ -0,0 +1,32 @@ +import pytest +import os + +pytest.importorskip("databricks") + + +from dlt.destinations.impl.databricks.configuration import DatabricksClientConfiguration +from dlt.common.configuration import resolve_configuration +from tests.utils import preserve_environ + + +def test_databricks_credentials_to_connector_params(): + os.environ["CREDENTIALS__SERVER_HOSTNAME"] = "my-databricks.example.com" + os.environ["CREDENTIALS__HTTP_PATH"] = "/sql/1.0/warehouses/asdfe" + os.environ["CREDENTIALS__ACCESS_TOKEN"] = "my-token" + os.environ["CREDENTIALS__CATALOG"] = "my-catalog" + # JSON encoded dict of extra args + os.environ["CREDENTIALS__CONNECTION_PARAMETERS"] = '{"extra_a": "a", "extra_b": "b"}' + + config = resolve_configuration(DatabricksClientConfiguration(dataset_name="my-dataset")) + + credentials = config.credentials + + params = credentials.to_connector_params() + + assert params["server_hostname"] == "my-databricks.example.com" + assert params["http_path"] == "/sql/1.0/warehouses/asdfe" + assert params["access_token"] == "my-token" + assert params["catalog"] == "my-catalog" + assert params["extra_a"] == "a" + assert params["extra_b"] == "b" + assert params["_socket_timeout"] == credentials.socket_timeout diff --git a/tests/load/pipeline/test_arrow_loading.py b/tests/load/pipeline/test_arrow_loading.py index 4a3c209c32..7f159f57b7 100644 --- a/tests/load/pipeline/test_arrow_loading.py +++ b/tests/load/pipeline/test_arrow_loading.py @@ -35,6 +35,7 @@ def test_load_item( include_time = destination_config.destination not in ( "athena", "redshift", + "databricks", ) # athena/redshift can't load TIME columns from parquet item, records = arrow_table_all_data_types( item_type, include_json=False, include_time=include_time diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index d170fd553b..8fb5b6c292 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -788,7 +788,7 @@ def other_data(): column_schemas["col11_precision"]["precision"] = 0 # drop TIME from databases not supporting it via parquet - if destination_config.destination in ["redshift", "athena"]: + if destination_config.destination in ["redshift", "athena", "databricks"]: data_types.pop("col11") data_types.pop("col11_null") data_types.pop("col11_precision") diff --git a/tests/load/pipeline/test_stage_loading.py b/tests/load/pipeline/test_stage_loading.py index de4a7f4c3b..0c3030ebaf 100644 --- a/tests/load/pipeline/test_stage_loading.py +++ b/tests/load/pipeline/test_stage_loading.py @@ -158,6 +158,7 @@ def test_all_data_types(destination_config: DestinationTestConfiguration) -> Non if destination_config.destination in ( "redshift", "athena", + "databricks", ) and destination_config.file_format in ("parquet", "jsonl"): # Redshift copy doesn't support TIME column exclude_types.append("time") @@ -167,6 +168,9 @@ def test_all_data_types(destination_config: DestinationTestConfiguration) -> Non ): # Redshift can't load fixed width binary columns from parquet exclude_columns.append("col7_precision") + if destination_config.destination == "databricks" and destination_config.file_format == "jsonl": + exclude_types.extend(["decimal", "binary", "wei", "complex", "date"]) + exclude_columns.append("col1_precision") column_schemas, data_types = table_update_and_row( exclude_types=exclude_types, exclude_columns=exclude_columns diff --git a/tests/load/test_job_client.py b/tests/load/test_job_client.py index 153504bf4a..774c494d28 100644 --- a/tests/load/test_job_client.py +++ b/tests/load/test_job_client.py @@ -28,13 +28,13 @@ from dlt.destinations.job_client_impl import SqlJobClientBase from dlt.common.destination.reference import WithStagingDataset +from tests.cases import table_update_and_row, assert_all_data_types_row from tests.utils import TEST_STORAGE_ROOT, autouse_test_storage from tests.common.utils import load_json_case from tests.load.utils import ( TABLE_UPDATE, TABLE_UPDATE_COLUMNS_SCHEMA, TABLE_ROW_ALL_DATA_TYPES, - assert_all_data_types_row, expect_load_file, load_table, yield_client_with_storage, @@ -389,6 +389,8 @@ def test_get_storage_table_with_all_types(client: SqlJobClientBase) -> None: continue if client.config.destination_type == "mssql" and c["data_type"] in ("complex"): continue + if client.config.destination_type == "databricks" and c["data_type"] in ("complex", "time"): + continue assert c["data_type"] == expected_c["data_type"] @@ -502,9 +504,14 @@ def test_load_with_all_types( if not client.capabilities.preferred_loader_file_format: pytest.skip("preferred loader file format not set, destination will only work with staging") table_name = "event_test_table" + uniq_id() + column_schemas, data_types = table_update_and_row( + exclude_types=["time"] if client.config.destination_type == "databricks" else None, + ) # we should have identical content with all disposition types client.schema.update_table( - new_table(table_name, write_disposition=write_disposition, columns=TABLE_UPDATE) + new_table( + table_name, write_disposition=write_disposition, columns=list(column_schemas.values()) + ) ) client.schema.bump_version() client.update_stored_schema() @@ -521,12 +528,12 @@ def test_load_with_all_types( canonical_name = client.sql_client.make_qualified_table_name(table_name) # write row with io.BytesIO() as f: - write_dataset(client, f, [TABLE_ROW_ALL_DATA_TYPES], TABLE_UPDATE_COLUMNS_SCHEMA) + write_dataset(client, f, [data_types], column_schemas) query = f.getvalue().decode() expect_load_file(client, file_storage, query, table_name) db_row = list(client.sql_client.execute_sql(f"SELECT * FROM {canonical_name}")[0]) # content must equal - assert_all_data_types_row(db_row) + assert_all_data_types_row(db_row, schema=column_schemas) @pytest.mark.parametrize( diff --git a/tests/load/test_sql_client.py b/tests/load/test_sql_client.py index 96f0db09bb..ff0a84e07e 100644 --- a/tests/load/test_sql_client.py +++ b/tests/load/test_sql_client.py @@ -311,16 +311,17 @@ def test_database_exceptions(client: SqlJobClientBase) -> None: pass assert client.sql_client.is_dbapi_exception(term_ex.value.dbapi_exception) with pytest.raises(DatabaseUndefinedRelation) as term_ex: - with client.sql_client.execute_query( - "DELETE FROM TABLE_XXX WHERE 1=1;DELETE FROM ticket_forms__ticket_field_ids WHERE 1=1;" - ): - pass + client.sql_client.execute_many( + [ + "DELETE FROM TABLE_XXX WHERE 1=1;", + "DELETE FROM ticket_forms__ticket_field_ids WHERE 1=1;", + ] + ) assert client.sql_client.is_dbapi_exception(term_ex.value.dbapi_exception) with pytest.raises(DatabaseUndefinedRelation) as term_ex: - with client.sql_client.execute_query( - "DROP TABLE TABLE_XXX;DROP TABLE ticket_forms__ticket_field_ids;" - ): - pass + client.sql_client.execute_many( + ["DROP TABLE TABLE_XXX;", "DROP TABLE ticket_forms__ticket_field_ids;"] + ) # invalid syntax with pytest.raises(DatabaseTransientException) as term_ex: @@ -360,7 +361,10 @@ def test_database_exceptions(client: SqlJobClientBase) -> None: @pytest.mark.parametrize( - "client", destinations_configs(default_sql_configs=True), indirect=True, ids=lambda x: x.name + "client", + destinations_configs(default_sql_configs=True, exclude=["databricks"]), + indirect=True, + ids=lambda x: x.name, ) def test_commit_transaction(client: SqlJobClientBase) -> None: table_name = prepare_temp_table(client) @@ -391,7 +395,10 @@ def test_commit_transaction(client: SqlJobClientBase) -> None: @pytest.mark.parametrize( - "client", destinations_configs(default_sql_configs=True), indirect=True, ids=lambda x: x.name + "client", + destinations_configs(default_sql_configs=True, exclude=["databricks"]), + indirect=True, + ids=lambda x: x.name, ) def test_rollback_transaction(client: SqlJobClientBase) -> None: if client.capabilities.supports_transactions is False: @@ -449,7 +456,10 @@ def test_rollback_transaction(client: SqlJobClientBase) -> None: @pytest.mark.parametrize( - "client", destinations_configs(default_sql_configs=True), indirect=True, ids=lambda x: x.name + "client", + destinations_configs(default_sql_configs=True, exclude=["databricks"]), + indirect=True, + ids=lambda x: x.name, ) def test_transaction_isolation(client: SqlJobClientBase) -> None: if client.capabilities.supports_transactions is False: @@ -546,7 +556,10 @@ def test_max_column_identifier_length(client: SqlJobClientBase) -> None: @pytest.mark.parametrize( - "client", destinations_configs(default_sql_configs=True), indirect=True, ids=lambda x: x.name + "client", + destinations_configs(default_sql_configs=True, exclude=["databricks"]), + indirect=True, + ids=lambda x: x.name, ) def test_recover_on_explicit_tx(client: SqlJobClientBase) -> None: if client.capabilities.supports_transactions is False: @@ -567,7 +580,7 @@ def test_recover_on_explicit_tx(client: SqlJobClientBase) -> None: # syntax error within tx statements = ["BEGIN TRANSACTION;", f"INVERT INTO {version_table} VALUES(1);", "COMMIT;"] with pytest.raises(DatabaseTransientException): - client.sql_client.execute_fragments(statements) + client.sql_client.execute_many(statements) # assert derives_from_class_of_name(term_ex.value.dbapi_exception, "ProgrammingError") assert client.get_stored_schema() is not None client.complete_load("EFG") @@ -581,7 +594,7 @@ def test_recover_on_explicit_tx(client: SqlJobClientBase) -> None: ] # cannot insert NULL value with pytest.raises(DatabaseTerminalException): - client.sql_client.execute_fragments(statements) + client.sql_client.execute_many(statements) # assert derives_from_class_of_name(term_ex.value.dbapi_exception, "IntegrityError") # assert isinstance(term_ex.value.dbapi_exception, (psycopg2.InternalError, psycopg2.)) assert client.get_stored_schema() is not None diff --git a/tests/load/utils.py b/tests/load/utils.py index 6811ca59a6..bc9bd9ddce 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -99,6 +99,7 @@ class DestinationTestConfiguration: supports_merge: bool = True # TODO: take it from client base class force_iceberg: bool = False supports_dbt: bool = True + disable_compression: bool = False @property def name(self) -> str: @@ -121,7 +122,7 @@ def setup(self) -> None: os.environ["DESTINATION__FORCE_ICEBERG"] = str(self.force_iceberg) or "" """For the filesystem destinations we disable compression to make analyzing the result easier""" - if self.destination == "filesystem": + if self.destination == "filesystem" or self.disable_compression: os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" def setup_pipeline( @@ -250,6 +251,36 @@ def destinations_configs( bucket_url=AZ_BUCKET, extra_info="az-authorization", ), + DestinationTestConfiguration( + destination="databricks", + staging="filesystem", + file_format="jsonl", + bucket_url=AWS_BUCKET, + extra_info="s3-authorization", + disable_compression=True, + ), + DestinationTestConfiguration( + destination="databricks", + staging="filesystem", + file_format="jsonl", + bucket_url=AZ_BUCKET, + extra_info="s3-authorization", + disable_compression=True, + ), + DestinationTestConfiguration( + destination="databricks", + staging="filesystem", + file_format="parquet", + bucket_url=AWS_BUCKET, + extra_info="s3-authorization", + ), + DestinationTestConfiguration( + destination="databricks", + staging="filesystem", + file_format="parquet", + bucket_url=AZ_BUCKET, + extra_info="az-authorization", + ), ] if all_staging_configs: diff --git a/tests/utils.py b/tests/utils.py index cf172f9733..777dd4a27d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -45,6 +45,7 @@ "motherduck", "mssql", "qdrant", + "databricks", } NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck", "qdrant"} SQL_DESTINATIONS = IMPLEMENTED_DESTINATIONS - NON_SQL_DESTINATIONS