From 4c5b0e3d2c395aa68e361bb04131e47bf8c6ae40 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sun, 24 Mar 2024 11:06:26 +0100 Subject: [PATCH] splits destination exceptions, fixes new schemas in tests --- dlt/common/destination/exceptions.py | 126 ++++++++++++++++++ dlt/common/destination/reference.py | 18 ++- dlt/common/exceptions.py | 109 --------------- dlt/common/pipeline.py | 10 +- dlt/destinations/exceptions.py | 15 +-- dlt/destinations/impl/dummy/dummy.py | 8 +- .../impl/motherduck/configuration.py | 2 +- dlt/destinations/impl/qdrant/qdrant_client.py | 1 + dlt/destinations/impl/weaviate/exceptions.py | 2 +- .../impl/weaviate/weaviate_client.py | 3 +- dlt/destinations/job_client_impl.py | 15 +-- dlt/load/exceptions.py | 10 +- dlt/load/load.py | 14 +- dlt/pipeline/pipeline.py | 4 +- tests/cli/common/test_telemetry_command.py | 1 - tests/common/test_destination.py | 2 +- tests/common/test_utils.py | 6 +- tests/destinations/test_custom_destination.py | 11 +- tests/extract/test_decorators.py | 1 + tests/load/pipeline/test_pipelines.py | 7 +- tests/load/pipeline/test_restore_state.py | 9 +- tests/load/test_sql_client.py | 6 +- tests/pipeline/test_pipeline.py | 4 +- .../test_pipeline_file_format_resolver.py | 2 +- 24 files changed, 193 insertions(+), 193 deletions(-) create mode 100644 dlt/common/destination/exceptions.py diff --git a/dlt/common/destination/exceptions.py b/dlt/common/destination/exceptions.py new file mode 100644 index 0000000000..1b5423ff02 --- /dev/null +++ b/dlt/common/destination/exceptions.py @@ -0,0 +1,126 @@ +from typing import Any, Iterable, List + +from dlt.common.exceptions import DltException, TerminalException, TransientException + + +class DestinationException(DltException): + pass + + +class UnknownDestinationModule(DestinationException): + def __init__(self, destination_module: str) -> None: + self.destination_module = destination_module + if "." in destination_module: + msg = f"Destination module {destination_module} could not be found and imported" + else: + msg = f"Destination {destination_module} is not one of the standard dlt destinations" + super().__init__(msg) + + +class InvalidDestinationReference(DestinationException): + def __init__(self, destination_module: Any) -> None: + self.destination_module = destination_module + msg = f"Destination {destination_module} is not a valid destination module." + super().__init__(msg) + + +class DestinationTerminalException(DestinationException, TerminalException): + pass + + +class DestinationUndefinedEntity(DestinationTerminalException): + pass + + +class DestinationTransientException(DestinationException, TransientException): + pass + + +class DestinationLoadingViaStagingNotSupported(DestinationTerminalException): + def __init__(self, destination: str) -> None: + self.destination = destination + super().__init__(f"Destination {destination} does not support loading via staging.") + + +class DestinationLoadingWithoutStagingNotSupported(DestinationTerminalException): + def __init__(self, destination: str) -> None: + self.destination = destination + super().__init__(f"Destination {destination} does not support loading without staging.") + + +class DestinationNoStagingMode(DestinationTerminalException): + def __init__(self, destination: str) -> None: + self.destination = destination + super().__init__(f"Destination {destination} cannot be used as a staging") + + +class DestinationIncompatibleLoaderFileFormatException(DestinationTerminalException): + def __init__( + self, destination: str, staging: str, file_format: str, supported_formats: Iterable[str] + ) -> None: + self.destination = destination + self.staging = staging + self.file_format = file_format + self.supported_formats = supported_formats + supported_formats_str = ", ".join(supported_formats) + if self.staging: + if not supported_formats: + msg = ( + f"Staging {staging} cannot be used with destination {destination} because they" + " have no file formats in common." + ) + else: + msg = ( + f"Unsupported file format {file_format} for destination {destination} in" + f" combination with staging destination {staging}. Supported formats:" + f" {supported_formats_str}" + ) + else: + msg = ( + f"Unsupported file format {file_format} destination {destination}. Supported" + f" formats: {supported_formats_str}. Check the staging option in the dlt.pipeline" + " for additional formats." + ) + super().__init__(msg) + + +class IdentifierTooLongException(DestinationTerminalException): + def __init__( + self, + destination_name: str, + identifier_type: str, + identifier_name: str, + max_identifier_length: int, + ) -> None: + self.destination_name = destination_name + self.identifier_type = identifier_type + self.identifier_name = identifier_name + self.max_identifier_length = max_identifier_length + super().__init__( + f"The length of {identifier_type} {identifier_name} exceeds" + f" {max_identifier_length} allowed for {destination_name}" + ) + + +class DestinationHasFailedJobs(DestinationTerminalException): + def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None: + self.destination_name = destination_name + self.load_id = load_id + self.failed_jobs = failed_jobs + super().__init__( + f"Destination {destination_name} has failed jobs in load package {load_id}" + ) + + +class DestinationSchemaTampered(DestinationTerminalException): + def __init__(self, schema_name: str, version_hash: str, stored_version_hash: str) -> None: + self.version_hash = version_hash + self.stored_version_hash = stored_version_hash + super().__init__( + f"Schema {schema_name} content was changed - by a loader or by destination code - from" + " the moment it was retrieved by load package. Such schema cannot reliably be updated" + f" nor saved. Current version hash: {version_hash} != stored version hash" + f" {stored_version_hash}. If you are using destination client directly, without storing" + " schema in load package, you should first save it into schema storage. You can also" + " use schema._bump_version() in test code to remove modified flag." + ) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 258efd80be..738c07bdc7 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -25,11 +25,6 @@ import inspect from dlt.common import logger -from dlt.common.exceptions import ( - IdentifierTooLongException, - InvalidDestinationReference, - UnknownDestinationModule, -) from dlt.common.schema import Schema, TTableSchema, TSchemaTables from dlt.common.schema.exceptions import SchemaException from dlt.common.schema.utils import ( @@ -43,13 +38,18 @@ from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration from dlt.common.configuration.accessors import config from dlt.common.destination.capabilities import DestinationCapabilitiesContext +from dlt.common.destination.exceptions import ( + IdentifierTooLongException, + InvalidDestinationReference, + UnknownDestinationModule, + DestinationSchemaTampered, +) from dlt.common.schema.utils import is_complete_column from dlt.common.schema.exceptions import UnknownTableException from dlt.common.storages import FileStorage from dlt.common.storages.load_storage import ParsedLoadJobFileName from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults - TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"] TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration") TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase") @@ -318,6 +318,12 @@ def update_stored_schema( Optional[TSchemaTables]: Returns an update that was applied at the destination. """ self._verify_schema() + # make sure that schema being saved was not modified from the moment it was loaded from storage + version_hash = self.schema.version_hash + if self.schema.is_modified: + raise DestinationSchemaTampered( + self.schema.name, version_hash, self.schema.stored_version_hash + ) return expected_update @abstractmethod diff --git a/dlt/common/exceptions.py b/dlt/common/exceptions.py index c14a743f33..fe526c53dc 100644 --- a/dlt/common/exceptions.py +++ b/dlt/common/exceptions.py @@ -133,115 +133,6 @@ class SystemConfigurationException(DltException): pass -class DestinationException(DltException): - pass - - -class UnknownDestinationModule(DestinationException): - def __init__(self, destination_module: str) -> None: - self.destination_module = destination_module - if "." in destination_module: - msg = f"Destination module {destination_module} could not be found and imported" - else: - msg = f"Destination {destination_module} is not one of the standard dlt destinations" - super().__init__(msg) - - -class InvalidDestinationReference(DestinationException): - def __init__(self, destination_module: Any) -> None: - self.destination_module = destination_module - msg = f"Destination {destination_module} is not a valid destination module." - super().__init__(msg) - - -class DestinationTerminalException(DestinationException, TerminalException): - pass - - -class DestinationUndefinedEntity(DestinationTerminalException): - pass - - -class DestinationTransientException(DestinationException, TransientException): - pass - - -class DestinationLoadingViaStagingNotSupported(DestinationTerminalException): - def __init__(self, destination: str) -> None: - self.destination = destination - super().__init__(f"Destination {destination} does not support loading via staging.") - - -class DestinationLoadingWithoutStagingNotSupported(DestinationTerminalException): - def __init__(self, destination: str) -> None: - self.destination = destination - super().__init__(f"Destination {destination} does not support loading without staging.") - - -class DestinationNoStagingMode(DestinationTerminalException): - def __init__(self, destination: str) -> None: - self.destination = destination - super().__init__(f"Destination {destination} cannot be used as a staging") - - -class DestinationIncompatibleLoaderFileFormatException(DestinationTerminalException): - def __init__( - self, destination: str, staging: str, file_format: str, supported_formats: Iterable[str] - ) -> None: - self.destination = destination - self.staging = staging - self.file_format = file_format - self.supported_formats = supported_formats - supported_formats_str = ", ".join(supported_formats) - if self.staging: - if not supported_formats: - msg = ( - f"Staging {staging} cannot be used with destination {destination} because they" - " have no file formats in common." - ) - else: - msg = ( - f"Unsupported file format {file_format} for destination {destination} in" - f" combination with staging destination {staging}. Supported formats:" - f" {supported_formats_str}" - ) - else: - msg = ( - f"Unsupported file format {file_format} destination {destination}. Supported" - f" formats: {supported_formats_str}. Check the staging option in the dlt.pipeline" - " for additional formats." - ) - super().__init__(msg) - - -class IdentifierTooLongException(DestinationTerminalException): - def __init__( - self, - destination_name: str, - identifier_type: str, - identifier_name: str, - max_identifier_length: int, - ) -> None: - self.destination_name = destination_name - self.identifier_type = identifier_type - self.identifier_name = identifier_name - self.max_identifier_length = max_identifier_length - super().__init__( - f"The length of {identifier_type} {identifier_name} exceeds" - f" {max_identifier_length} allowed for {destination_name}" - ) - - -class DestinationHasFailedJobs(DestinationTerminalException): - def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None: - self.destination_name = destination_name - self.load_id = load_id - self.failed_jobs = failed_jobs - super().__init__( - f"Destination {destination_name} has failed jobs in load package {load_id}" - ) - - class PipelineException(DltException): def __init__(self, pipeline_name: str, msg: str) -> None: """Base class for all pipeline exceptions. Should not be raised.""" diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 3cbaafefbe..57dda11c39 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -32,24 +32,18 @@ from dlt.common.configuration.paths import get_dlt_data_dir from dlt.common.configuration.specs import RunConfiguration from dlt.common.destination import TDestinationReferenceArg, TDestination -from dlt.common.exceptions import ( - DestinationHasFailedJobs, - PipelineStateNotAvailable, - SourceSectionNotAvailable, -) +from dlt.common.destination.exceptions import DestinationHasFailedJobs +from dlt.common.exceptions import PipelineStateNotAvailable, SourceSectionNotAvailable from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition, TSchemaContract from dlt.common.source import get_current_pipe_name from dlt.common.storages.load_storage import LoadPackageInfo -from dlt.common.storages.load_package import PackageStorage - from dlt.common.time import ensure_pendulum_datetime, precise_time from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize from dlt.common.jsonpath import delete_matches, TAnyJsonPath from dlt.common.data_writers.writers import DataWriterMetrics, TLoaderFileFormat from dlt.common.utils import RowCounts, merge_row_counts from dlt.common.versioned_state import TVersionedState -from dlt.common.storages.load_package import TLoadPackageState class _StepInfo(NamedTuple): diff --git a/dlt/destinations/exceptions.py b/dlt/destinations/exceptions.py index cc4d4fd836..5e6adb007d 100644 --- a/dlt/destinations/exceptions.py +++ b/dlt/destinations/exceptions.py @@ -1,5 +1,6 @@ from typing import Sequence -from dlt.common.exceptions import ( + +from dlt.common.destination.exceptions import ( DestinationTerminalException, DestinationTransientException, DestinationUndefinedEntity, @@ -63,18 +64,6 @@ def __init__(self, table_name: str, columns: Sequence[str], msg: str) -> None: ) -class DestinationSchemaTampered(DestinationTerminalException): - def __init__(self, schema_name: str, version_hash: str, stored_version_hash: str) -> None: - self.version_hash = version_hash - self.stored_version_hash = stored_version_hash - super().__init__( - f"Schema {schema_name} content was changed - by a loader or by destination code - from" - " the moment it was retrieved by load package. Such schema cannot reliably be updated" - f" or saved. Current version hash: {version_hash} != stored version hash" - f" {stored_version_hash}" - ) - - class LoadJobNotExistsException(DestinationTerminalException): def __init__(self, job_id: str) -> None: super().__init__(f"Job with id/file name {job_id} not found") diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index c46e329819..0d91220d88 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -16,9 +16,12 @@ from dlt.common import pendulum from dlt.common.schema import Schema, TTableSchema, TSchemaTables -from dlt.common.schema.typing import TWriteDisposition from dlt.common.storages import FileStorage from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.destination.exceptions import ( + DestinationTerminalException, + DestinationTransientException, +) from dlt.common.destination.reference import ( FollowupJob, NewLoadJob, @@ -32,10 +35,7 @@ from dlt.destinations.exceptions import ( LoadJobNotExistsException, LoadJobInvalidStateTransitionException, - DestinationTerminalException, - DestinationTransientException, ) - from dlt.destinations.impl.dummy import capabilities from dlt.destinations.impl.dummy.configuration import DummyClientConfiguration from dlt.destinations.job_impl import NewReferenceJob diff --git a/dlt/destinations/impl/motherduck/configuration.py b/dlt/destinations/impl/motherduck/configuration.py index f4ab571e5c..35f02f709a 100644 --- a/dlt/destinations/impl/motherduck/configuration.py +++ b/dlt/destinations/impl/motherduck/configuration.py @@ -2,7 +2,7 @@ from dlt.common.configuration import configspec from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration -from dlt.common.exceptions import DestinationTerminalException +from dlt.common.destination.exceptions import DestinationTerminalException from dlt.common.typing import TSecretValue from dlt.common.utils import digest128 from dlt.common.configuration.exceptions import ConfigurationValueError diff --git a/dlt/destinations/impl/qdrant/qdrant_client.py b/dlt/destinations/impl/qdrant/qdrant_client.py index 2df3023d86..febfe38ec9 100644 --- a/dlt/destinations/impl/qdrant/qdrant_client.py +++ b/dlt/destinations/impl/qdrant/qdrant_client.py @@ -283,6 +283,7 @@ def _delete_sentinel_collection(self) -> None: def update_stored_schema( self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None ) -> Optional[TSchemaTables]: + super().update_stored_schema(only_tables, expected_update) applied_update: TSchemaTables = {} schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash) if schema_info is None: diff --git a/dlt/destinations/impl/weaviate/exceptions.py b/dlt/destinations/impl/weaviate/exceptions.py index bff1b4cacc..ee798e4e76 100644 --- a/dlt/destinations/impl/weaviate/exceptions.py +++ b/dlt/destinations/impl/weaviate/exceptions.py @@ -1,4 +1,4 @@ -from dlt.common.exceptions import DestinationException, DestinationTerminalException +from dlt.common.destination.exceptions import DestinationException, DestinationTerminalException class WeaviateBatchError(DestinationException): diff --git a/dlt/destinations/impl/weaviate/weaviate_client.py b/dlt/destinations/impl/weaviate/weaviate_client.py index 2d23dc38f7..6486a75e6e 100644 --- a/dlt/destinations/impl/weaviate/weaviate_client.py +++ b/dlt/destinations/impl/weaviate/weaviate_client.py @@ -14,7 +14,7 @@ cast, ) -from dlt.common.exceptions import ( +from dlt.common.destination.exceptions import ( DestinationUndefinedEntity, DestinationTransientException, DestinationTerminalException, @@ -424,6 +424,7 @@ def _delete_sentinel_class(self) -> None: def update_stored_schema( self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None ) -> Optional[TSchemaTables]: + super().update_stored_schema(only_tables, expected_update) # Retrieve the schema from Weaviate applied_update: TSchemaTables = {} try: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 7896fa2cc4..ea0d10d11d 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -35,13 +35,13 @@ ) from dlt.common.storages import FileStorage from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns, TSchemaTables +from dlt.common.schema.typing import LOADS_TABLE_NAME, VERSION_TABLE_NAME from dlt.common.destination.reference import ( StateInfo, StorageSchemaInfo, WithStateSync, DestinationClientConfiguration, DestinationClientDwhConfiguration, - DestinationClientDwhWithStagingConfiguration, NewLoadJob, WithStagingDataset, TLoadJobState, @@ -50,15 +50,10 @@ FollowupJob, CredentialsConfiguration, ) -from dlt.destinations.exceptions import ( - DatabaseUndefinedRelation, - DestinationSchemaTampered, - DestinationSchemaWillNotUpdate, -) + +from dlt.destinations.exceptions import DatabaseUndefinedRelation from dlt.destinations.job_impl import EmptyLoadJobWithoutFollowup, NewReferenceJob from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob -from dlt.common.schema.typing import LOADS_TABLE_NAME, VERSION_TABLE_NAME - from dlt.destinations.typing import TNativeConn from dlt.destinations.sql_client import SqlClientBase @@ -539,10 +534,6 @@ def _replace_schema_in_storage(self, schema: Schema) -> None: self._update_schema_in_storage(schema) def _update_schema_in_storage(self, schema: Schema) -> None: - # make sure that schema being saved was not modified from the moment it was loaded from storage - version_hash = schema.version_hash - if version_hash != schema.stored_version_hash: - raise DestinationSchemaTampered(schema.name, version_hash, schema.stored_version_hash) # get schema string or zip schema_str = json.dumps(schema.to_dict()) # TODO: not all databases store data as utf-8 but this exception is mostly for redshift diff --git a/dlt/load/exceptions.py b/dlt/load/exceptions.py index 8a704660ce..e85dffd2e9 100644 --- a/dlt/load/exceptions.py +++ b/dlt/load/exceptions.py @@ -1,10 +1,8 @@ from typing import Sequence -from dlt.destinations.exceptions import DestinationTerminalException, DestinationTransientException - - -# class LoadException(DltException): -# def __init__(self, msg: str) -> None: -# super().__init__(msg) +from dlt.common.destination.exceptions import ( + DestinationTerminalException, + DestinationTransientException, +) class LoadClientJobFailed(DestinationTerminalException): diff --git a/dlt/load/load.py b/dlt/load/load.py index 23c3dea820..a0909fa2d0 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -21,15 +21,10 @@ from dlt.common.runners import TRunMetrics, Runnable, workermethod, NullExecutor from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.runtime.logger import pretty_format_exception -from dlt.common.exceptions import ( - TerminalValueError, - DestinationTerminalException, - DestinationTransientException, -) +from dlt.common.exceptions import TerminalValueError from dlt.common.configuration.container import Container - +from dlt.common.configuration.specs.config_section_context import ConfigSectionContext from dlt.common.schema import Schema, TSchemaTables - from dlt.common.storages import LoadStorage from dlt.common.destination.reference import ( DestinationClientDwhConfiguration, @@ -44,7 +39,10 @@ SupportsStagingDestination, TDestination, ) -from dlt.common.configuration.specs.config_section_context import ConfigSectionContext +from dlt.common.destination.exceptions import ( + DestinationTerminalException, + DestinationTransientException, +) from dlt.destinations.job_impl import EmptyLoadJob diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 9f4d5c2db7..efb6ae078b 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -28,14 +28,14 @@ ) from dlt.common.configuration.specs.config_section_context import ConfigSectionContext from dlt.common.configuration.resolve import initialize_credentials -from dlt.common.exceptions import ( +from dlt.common.destination.exceptions import ( DestinationLoadingViaStagingNotSupported, DestinationLoadingWithoutStagingNotSupported, DestinationNoStagingMode, - MissingDependencyException, DestinationUndefinedEntity, DestinationIncompatibleLoaderFileFormatException, ) +from dlt.common.exceptions import MissingDependencyException from dlt.common.normalizers import explicit_normalizers, import_normalizers from dlt.common.runtime import signals, initialize_runtime from dlt.common.schema.typing import ( diff --git a/tests/cli/common/test_telemetry_command.py b/tests/cli/common/test_telemetry_command.py index 18bd67a5e0..1b6588c9c8 100644 --- a/tests/cli/common/test_telemetry_command.py +++ b/tests/cli/common/test_telemetry_command.py @@ -139,7 +139,6 @@ def test_instrumentation_wrappers() -> None: COMMAND_DEPLOY_REPO_LOCATION, DeploymentMethods, ) - from dlt.common.exceptions import UnknownDestinationModule with patch("dlt.common.runtime.segment.before_send", _mock_before_send): start_test_telemetry() diff --git a/tests/common/test_destination.py b/tests/common/test_destination.py index b93cb5b483..5240b889f3 100644 --- a/tests/common/test_destination.py +++ b/tests/common/test_destination.py @@ -2,7 +2,7 @@ from dlt.common.destination.reference import DestinationClientDwhConfiguration, Destination from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.exceptions import InvalidDestinationReference, UnknownDestinationModule +from dlt.common.destination.exceptions import InvalidDestinationReference, UnknownDestinationModule from dlt.common.schema import Schema from tests.utils import ACTIVE_DESTINATIONS diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py index 456ef3cb91..586f8d6546 100644 --- a/tests/common/test_utils.py +++ b/tests/common/test_utils.py @@ -3,7 +3,7 @@ import binascii import pytest from typing import Dict -from dlt.common.exceptions import IdentifierTooLongException, PipelineException, TerminalValueError +from dlt.common.exceptions import PipelineException, TerminalValueError from dlt.common.runners import Venv from dlt.common.utils import ( @@ -231,6 +231,8 @@ def test_extend_list_deduplicated() -> None: def test_exception_traces() -> None: + from dlt.common.destination.exceptions import IdentifierTooLongException + # bare exception without stack trace trace = get_exception_trace(Exception("Message")) assert trace["message"] == "Message" @@ -262,6 +264,8 @@ def test_exception_traces() -> None: def test_exception_trace_chain() -> None: + from dlt.common.destination.exceptions import IdentifierTooLongException + try: raise TerminalValueError("Val") except Exception: diff --git a/tests/destinations/test_custom_destination.py b/tests/destinations/test_custom_destination.py index 7b74e5406c..7280ec419b 100644 --- a/tests/destinations/test_custom_destination.py +++ b/tests/destinations/test_custom_destination.py @@ -12,16 +12,16 @@ from dlt.common.schema import TTableSchema from dlt.common.data_writers.writers import TLoaderFileFormat from dlt.common.destination.reference import Destination -from dlt.pipeline.exceptions import PipelineStepFailed -from dlt.common.utils import uniq_id -from dlt.common.exceptions import DestinationTerminalException, InvalidDestinationReference +from dlt.common.destination.exceptions import InvalidDestinationReference from dlt.common.configuration.exceptions import ConfigFieldMissingException from dlt.common.configuration.specs import ConnectionStringCredentials -from dlt.destinations.impl.destination.factory import _DESTINATIONS -from dlt.destinations.impl.destination.configuration import CustomDestinationClientConfiguration from dlt.common.configuration.inject import get_fun_spec from dlt.common.configuration.specs import BaseConfiguration +from dlt.destinations.impl.destination.factory import _DESTINATIONS +from dlt.destinations.impl.destination.configuration import CustomDestinationClientConfiguration +from dlt.pipeline.exceptions import PipelineStepFailed + from tests.load.utils import ( TABLE_ROW_ALL_DATA_TYPES, TABLE_UPDATE_COLUMNS_SCHEMA, @@ -462,7 +462,6 @@ class MyDestinationSpec(CustomDestinationClientConfiguration): def sink_func_with_spec( items: TDataItems, table: TTableSchema, my_predefined_val=dlt.config.value ) -> None: - # raise DestinationTerminalException("NEVER") pass wrapped_callable = sink_func_with_spec().config_params["destination_callable"] diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 06e2ed4efb..dca4c0be6e 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -52,6 +52,7 @@ def resource(): # simple generated table schema assert resource().compute_table_schema() == { + "columns": {}, "name": "resource", "resource": "resource", "write_disposition": "append", diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 05c70e2f62..017bef2c01 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -6,13 +6,16 @@ import dlt -from dlt.common.pipeline import SupportsPipeline from dlt.common import json, sleep +from dlt.common.pipeline import SupportsPipeline from dlt.common.destination import Destination +from dlt.common.destination.exceptions import DestinationHasFailedJobs +from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.schema.schema import Schema from dlt.common.schema.typing import VERSION_TABLE_NAME from dlt.common.typing import TDataItem from dlt.common.utils import uniq_id + from dlt.destinations.exceptions import DatabaseUndefinedRelation from dlt.extract.exceptions import ResourceNameMissing from dlt.extract import DltSource @@ -21,8 +24,6 @@ PipelineConfigMissing, PipelineStepFailed, ) -from dlt.common.schema.exceptions import CannotCoerceColumnException -from dlt.common.exceptions import DestinationHasFailedJobs from tests.utils import TEST_STORAGE_ROOT, data_to_item_format, preserve_environ from tests.pipeline.utils import assert_data_table_counts, assert_load_info diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index 02da91cefe..e50654adcc 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -6,12 +6,12 @@ import dlt from dlt.common import pendulum -from dlt.common.schema.schema import Schema, utils -from dlt.common.utils import custom_environ, uniq_id -from dlt.common.exceptions import DestinationUndefinedEntity +from dlt.common.schema.schema import Schema +from dlt.common.utils import uniq_id +from dlt.common.destination.exceptions import DestinationUndefinedEntity + from dlt.load import Load from dlt.pipeline.exceptions import SqlClientNotAvailable - from dlt.pipeline.pipeline import Pipeline from dlt.pipeline.state_sync import ( STATE_TABLE_COLUMNS, @@ -207,6 +207,7 @@ def _make_dn_name(schema_name: str) -> str: job_client ) == default_schema.naming.normalize_table_identifier(dataset_name) schema_two = Schema("two") + schema_two._bump_version() with p._get_destination_clients(schema_two)[0] as job_client: # use the job_client to do that job_client.initialize_storage() diff --git a/tests/load/test_sql_client.py b/tests/load/test_sql_client.py index ed0c3255d7..d82925a7d3 100644 --- a/tests/load/test_sql_client.py +++ b/tests/load/test_sql_client.py @@ -5,17 +5,17 @@ from time import sleep from dlt.common import pendulum, Decimal -from dlt.common.exceptions import IdentifierTooLongException +from dlt.common.destination.exceptions import IdentifierTooLongException from dlt.common.schema.typing import LOADS_TABLE_NAME, VERSION_TABLE_NAME from dlt.common.storages import FileStorage -from dlt.common.utils import derives_from_class_of_name, uniq_id +from dlt.common.utils import uniq_id + from dlt.destinations.exceptions import ( DatabaseException, DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation, ) - from dlt.destinations.sql_client import DBApiCursor, SqlClientBase from dlt.destinations.job_client_impl import SqlJobClientBase from dlt.destinations.typing import TNativeConn diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index dfe9fe8773..37356c2b44 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -20,12 +20,12 @@ from dlt.common.configuration.specs.gcp_credentials import GcpOAuthCredentials from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import WithStateSync -from dlt.common.exceptions import ( +from dlt.common.destination.exceptions import ( DestinationHasFailedJobs, DestinationTerminalException, - PipelineStateNotAvailable, UnknownDestinationModule, ) +from dlt.common.exceptions import PipelineStateNotAvailable from dlt.common.pipeline import LoadInfo, PipelineContext from dlt.common.runtime.collector import LogCollector from dlt.common.schema.utils import new_column, new_table diff --git a/tests/pipeline/test_pipeline_file_format_resolver.py b/tests/pipeline/test_pipeline_file_format_resolver.py index 49a38c455b..588ad720a5 100644 --- a/tests/pipeline/test_pipeline_file_format_resolver.py +++ b/tests/pipeline/test_pipeline_file_format_resolver.py @@ -3,7 +3,7 @@ import dlt import pytest -from dlt.common.exceptions import ( +from dlt.common.destination.exceptions import ( DestinationIncompatibleLoaderFileFormatException, DestinationLoadingViaStagingNotSupported, DestinationNoStagingMode,