Skip to content

Commit

Permalink
Allows any duckdb version, fixes databricks az credentials (#1854)
Browse files Browse the repository at this point in the history
* allows for any duckdb version >= 0.9

* warns if parent_key is used without parent table

* allows principal az crednetials with named databrick credential

* makes AssertError terminal in load
  • Loading branch information
rudolfix authored Sep 24, 2024
1 parent f1f6ff3 commit db4f445
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 167 deletions.
9 changes: 3 additions & 6 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.destination.utils import verify_schema_capabilities, verify_supported_data_types
from dlt.common.exceptions import TerminalValueError
from dlt.common.exceptions import TerminalException
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import (
C_DLT_LOAD_ID,
_TTableSchemaBase,
TWriteDisposition,
TLoaderReplaceStrategy,
)
from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table
Expand All @@ -46,7 +44,6 @@
UnknownDestinationModule,
DestinationSchemaTampered,
DestinationTransientException,
DestinationTerminalException,
)
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
Expand Down Expand Up @@ -357,7 +354,7 @@ def __init__(self, file_path: str) -> None:
# ensure file name
super().__init__(file_path)
self._state: TLoadJobState = "ready"
self._exception: Exception = None
self._exception: BaseException = None

# variables needed by most jobs, set by the loader in set_run_vars
self._schema: Schema = None
Expand Down Expand Up @@ -396,7 +393,7 @@ def run_managed(
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
except (DestinationTerminalException, TerminalValueError) as e:
except (TerminalException, AssertionError) as e:
self._state = "failed"
self._exception = e
logger.exception(f"Terminal exception in job {self.job_id()} in file {self._file_path}")
Expand Down
15 changes: 13 additions & 2 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from urllib.parse import urlparse, urlunparse

from dlt import config
from dlt.common.configuration.specs.azure_credentials import (
AzureServicePrincipalCredentialsWithoutDefaults,
)
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
HasFollowupJobs,
Expand Down Expand Up @@ -95,15 +98,23 @@ def run(self) -> None:
))
"""
elif bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS:
assert isinstance(staging_credentials, AzureCredentialsWithoutDefaults)
assert isinstance(
staging_credentials, AzureCredentialsWithoutDefaults
), "AzureCredentialsWithoutDefaults required to pass explicit credential"
# Explicit azure credentials are needed to load from bucket without a named stage
credentials_clause = f"""WITH(CREDENTIAL(AZURE_SAS_TOKEN='{staging_credentials.azure_storage_sas_token}'))"""
bucket_path = self.ensure_databricks_abfss_url(
bucket_path, staging_credentials.azure_storage_account_name
)

if bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS:
assert isinstance(staging_credentials, AzureCredentialsWithoutDefaults)
assert isinstance(
staging_credentials,
(
AzureCredentialsWithoutDefaults,
AzureServicePrincipalCredentialsWithoutDefaults,
),
)
bucket_path = self.ensure_databricks_abfss_url(
bucket_path, staging_credentials.azure_storage_account_name
)
Expand Down
3 changes: 2 additions & 1 deletion dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os

from dlt.common import logger
from dlt.common.exceptions import TerminalException
from dlt.common.metrics import LoadJobMetrics
from dlt.common.runtime.signals import sleep
from dlt.common.configuration import with_config, known_sections
Expand Down Expand Up @@ -197,7 +198,7 @@ def submit_job(
" extension could not be associated with job type and that indicates an error"
" in the code."
)
except DestinationTerminalException:
except (TerminalException, AssertionError):
job = FinalizedLoadJobWithFollowupJobs.from_file_path(
file_path, "failed", pretty_format_exception()
)
Expand Down
16 changes: 15 additions & 1 deletion dlt/normalize/validate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema import Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.utils import find_incomplete_columns
from dlt.common.schema.utils import (
find_incomplete_columns,
get_first_column_name_with_prop,
is_nested_table,
)
from dlt.common.schema.exceptions import UnboundColumnException
from dlt.common import logger

Expand Down Expand Up @@ -41,3 +45,13 @@ def verify_normalized_table(
f"`{table['table_format']}` for table `{table['name']}`. "
"The setting will probably be ignored."
)

parent_key = get_first_column_name_with_prop(table, "parent_key")
if parent_key and not is_nested_table(table):
logger.warning(
f"Table {table['name']} has parent_key on column {parent_key} but no corresponding"
" `parent` table hint to refer to parent table.Such table is not considered a nested"
" table and relational normalizer will not generate linking data. The most probable"
" cause is manual modification of the dtl schema for the table. The most probable"
f" outcome will be NULL violation during the load process on {parent_key}."
)
Loading

0 comments on commit db4f445

Please sign in to comment.