Skip to content

Commit

Permalink
splits destination exceptions, fixes new schemas in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Mar 24, 2024
1 parent 4810bae commit 4c5b0e3
Show file tree
Hide file tree
Showing 24 changed files with 193 additions and 193 deletions.
126 changes: 126 additions & 0 deletions dlt/common/destination/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Any, Iterable, List

from dlt.common.exceptions import DltException, TerminalException, TransientException


class DestinationException(DltException):
pass


class UnknownDestinationModule(DestinationException):
def __init__(self, destination_module: str) -> None:
self.destination_module = destination_module
if "." in destination_module:
msg = f"Destination module {destination_module} could not be found and imported"
else:
msg = f"Destination {destination_module} is not one of the standard dlt destinations"
super().__init__(msg)


class InvalidDestinationReference(DestinationException):
def __init__(self, destination_module: Any) -> None:
self.destination_module = destination_module
msg = f"Destination {destination_module} is not a valid destination module."
super().__init__(msg)


class DestinationTerminalException(DestinationException, TerminalException):
pass


class DestinationUndefinedEntity(DestinationTerminalException):
pass


class DestinationTransientException(DestinationException, TransientException):
pass


class DestinationLoadingViaStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading via staging.")


class DestinationLoadingWithoutStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading without staging.")


class DestinationNoStagingMode(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} cannot be used as a staging")


class DestinationIncompatibleLoaderFileFormatException(DestinationTerminalException):
def __init__(
self, destination: str, staging: str, file_format: str, supported_formats: Iterable[str]
) -> None:
self.destination = destination
self.staging = staging
self.file_format = file_format
self.supported_formats = supported_formats
supported_formats_str = ", ".join(supported_formats)
if self.staging:
if not supported_formats:
msg = (
f"Staging {staging} cannot be used with destination {destination} because they"
" have no file formats in common."
)
else:
msg = (
f"Unsupported file format {file_format} for destination {destination} in"
f" combination with staging destination {staging}. Supported formats:"
f" {supported_formats_str}"
)
else:
msg = (
f"Unsupported file format {file_format} destination {destination}. Supported"
f" formats: {supported_formats_str}. Check the staging option in the dlt.pipeline"
" for additional formats."
)
super().__init__(msg)


class IdentifierTooLongException(DestinationTerminalException):
def __init__(
self,
destination_name: str,
identifier_type: str,
identifier_name: str,
max_identifier_length: int,
) -> None:
self.destination_name = destination_name
self.identifier_type = identifier_type
self.identifier_name = identifier_name
self.max_identifier_length = max_identifier_length
super().__init__(
f"The length of {identifier_type} {identifier_name} exceeds"
f" {max_identifier_length} allowed for {destination_name}"
)


class DestinationHasFailedJobs(DestinationTerminalException):
def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None:
self.destination_name = destination_name
self.load_id = load_id
self.failed_jobs = failed_jobs
super().__init__(
f"Destination {destination_name} has failed jobs in load package {load_id}"
)


class DestinationSchemaTampered(DestinationTerminalException):
def __init__(self, schema_name: str, version_hash: str, stored_version_hash: str) -> None:
self.version_hash = version_hash
self.stored_version_hash = stored_version_hash
super().__init__(
f"Schema {schema_name} content was changed - by a loader or by destination code - from"
" the moment it was retrieved by load package. Such schema cannot reliably be updated"
f" nor saved. Current version hash: {version_hash} != stored version hash"
f" {stored_version_hash}. If you are using destination client directly, without storing"
" schema in load package, you should first save it into schema storage. You can also"
" use schema._bump_version() in test code to remove modified flag."
)
18 changes: 12 additions & 6 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@
import inspect

from dlt.common import logger
from dlt.common.exceptions import (
IdentifierTooLongException,
InvalidDestinationReference,
UnknownDestinationModule,
)
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
Expand All @@ -43,13 +38,18 @@
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import (
IdentifierTooLongException,
InvalidDestinationReference,
UnknownDestinationModule,
DestinationSchemaTampered,
)
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
Expand Down Expand Up @@ -318,6 +318,12 @@ def update_stored_schema(
Optional[TSchemaTables]: Returns an update that was applied at the destination.
"""
self._verify_schema()
# make sure that schema being saved was not modified from the moment it was loaded from storage
version_hash = self.schema.version_hash
if self.schema.is_modified:
raise DestinationSchemaTampered(
self.schema.name, version_hash, self.schema.stored_version_hash
)
return expected_update

@abstractmethod
Expand Down
109 changes: 0 additions & 109 deletions dlt/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,115 +133,6 @@ class SystemConfigurationException(DltException):
pass


class DestinationException(DltException):
pass


class UnknownDestinationModule(DestinationException):
def __init__(self, destination_module: str) -> None:
self.destination_module = destination_module
if "." in destination_module:
msg = f"Destination module {destination_module} could not be found and imported"
else:
msg = f"Destination {destination_module} is not one of the standard dlt destinations"
super().__init__(msg)


class InvalidDestinationReference(DestinationException):
def __init__(self, destination_module: Any) -> None:
self.destination_module = destination_module
msg = f"Destination {destination_module} is not a valid destination module."
super().__init__(msg)


class DestinationTerminalException(DestinationException, TerminalException):
pass


class DestinationUndefinedEntity(DestinationTerminalException):
pass


class DestinationTransientException(DestinationException, TransientException):
pass


class DestinationLoadingViaStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading via staging.")


class DestinationLoadingWithoutStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading without staging.")


class DestinationNoStagingMode(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} cannot be used as a staging")


class DestinationIncompatibleLoaderFileFormatException(DestinationTerminalException):
def __init__(
self, destination: str, staging: str, file_format: str, supported_formats: Iterable[str]
) -> None:
self.destination = destination
self.staging = staging
self.file_format = file_format
self.supported_formats = supported_formats
supported_formats_str = ", ".join(supported_formats)
if self.staging:
if not supported_formats:
msg = (
f"Staging {staging} cannot be used with destination {destination} because they"
" have no file formats in common."
)
else:
msg = (
f"Unsupported file format {file_format} for destination {destination} in"
f" combination with staging destination {staging}. Supported formats:"
f" {supported_formats_str}"
)
else:
msg = (
f"Unsupported file format {file_format} destination {destination}. Supported"
f" formats: {supported_formats_str}. Check the staging option in the dlt.pipeline"
" for additional formats."
)
super().__init__(msg)


class IdentifierTooLongException(DestinationTerminalException):
def __init__(
self,
destination_name: str,
identifier_type: str,
identifier_name: str,
max_identifier_length: int,
) -> None:
self.destination_name = destination_name
self.identifier_type = identifier_type
self.identifier_name = identifier_name
self.max_identifier_length = max_identifier_length
super().__init__(
f"The length of {identifier_type} {identifier_name} exceeds"
f" {max_identifier_length} allowed for {destination_name}"
)


class DestinationHasFailedJobs(DestinationTerminalException):
def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None:
self.destination_name = destination_name
self.load_id = load_id
self.failed_jobs = failed_jobs
super().__init__(
f"Destination {destination_name} has failed jobs in load package {load_id}"
)


class PipelineException(DltException):
def __init__(self, pipeline_name: str, msg: str) -> None:
"""Base class for all pipeline exceptions. Should not be raised."""
Expand Down
10 changes: 2 additions & 8 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,18 @@
from dlt.common.configuration.paths import get_dlt_data_dir
from dlt.common.configuration.specs import RunConfiguration
from dlt.common.destination import TDestinationReferenceArg, TDestination
from dlt.common.exceptions import (
DestinationHasFailedJobs,
PipelineStateNotAvailable,
SourceSectionNotAvailable,
)
from dlt.common.destination.exceptions import DestinationHasFailedJobs
from dlt.common.exceptions import PipelineStateNotAvailable, SourceSectionNotAvailable
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition, TSchemaContract
from dlt.common.source import get_current_pipe_name
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.storages.load_package import PackageStorage

from dlt.common.time import ensure_pendulum_datetime, precise_time
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import DataWriterMetrics, TLoaderFileFormat
from dlt.common.utils import RowCounts, merge_row_counts
from dlt.common.versioned_state import TVersionedState
from dlt.common.storages.load_package import TLoadPackageState


class _StepInfo(NamedTuple):
Expand Down
15 changes: 2 additions & 13 deletions dlt/destinations/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Sequence
from dlt.common.exceptions import (

from dlt.common.destination.exceptions import (
DestinationTerminalException,
DestinationTransientException,
DestinationUndefinedEntity,
Expand Down Expand Up @@ -63,18 +64,6 @@ def __init__(self, table_name: str, columns: Sequence[str], msg: str) -> None:
)


class DestinationSchemaTampered(DestinationTerminalException):
def __init__(self, schema_name: str, version_hash: str, stored_version_hash: str) -> None:
self.version_hash = version_hash
self.stored_version_hash = stored_version_hash
super().__init__(
f"Schema {schema_name} content was changed - by a loader or by destination code - from"
" the moment it was retrieved by load package. Such schema cannot reliably be updated"
f" or saved. Current version hash: {version_hash} != stored version hash"
f" {stored_version_hash}"
)


class LoadJobNotExistsException(DestinationTerminalException):
def __init__(self, job_id: str) -> None:
super().__init__(f"Job with id/file name {job_id} not found")
Expand Down
8 changes: 4 additions & 4 deletions dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

from dlt.common import pendulum
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.storages import FileStorage
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import (
DestinationTerminalException,
DestinationTransientException,
)
from dlt.common.destination.reference import (
FollowupJob,
NewLoadJob,
Expand All @@ -32,10 +35,7 @@
from dlt.destinations.exceptions import (
LoadJobNotExistsException,
LoadJobInvalidStateTransitionException,
DestinationTerminalException,
DestinationTransientException,
)

from dlt.destinations.impl.dummy import capabilities
from dlt.destinations.impl.dummy.configuration import DummyClientConfiguration
from dlt.destinations.job_impl import NewReferenceJob
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/motherduck/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dlt.common.configuration import configspec
from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration
from dlt.common.exceptions import DestinationTerminalException
from dlt.common.destination.exceptions import DestinationTerminalException
from dlt.common.typing import TSecretValue
from dlt.common.utils import digest128
from dlt.common.configuration.exceptions import ConfigurationValueError
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/qdrant/qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def _delete_sentinel_collection(self) -> None:
def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
if schema_info is None:
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/weaviate/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dlt.common.exceptions import DestinationException, DestinationTerminalException
from dlt.common.destination.exceptions import DestinationException, DestinationTerminalException


class WeaviateBatchError(DestinationException):
Expand Down
Loading

0 comments on commit 4c5b0e3

Please sign in to comment.