Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds missing docs / tests and configurable staging dataset name #1555

Merged
merged 5 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
casefold_identifier: Callable[[str], str] = str
"""Casing function applied by destination to represent case insensitive identifiers."""
has_case_sensitive_identifiers: bool = None
"""Tells if identifiers in destination are case sensitive, before case_identifier function is applied"""
"""Tells if destination supports case sensitive identifiers"""
decimal_precision: Tuple[int, int] = None
wei_precision: Tuple[int, int] = None
max_identifier_length: int = None
Expand Down Expand Up @@ -96,6 +96,11 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None
"""The destination can override the parallelism strategy"""

def generates_case_sensitive_identifiers(self) -> bool:
"""Tells if capabilities as currently adjusted, will generate case sensitive identifiers"""
# must have case sensitive support and folding function must preserve casing
return self.has_case_sensitive_identifiers and self.casefold_identifier is str

@staticmethod
def generic_capabilities(
preferred_loader_file_format: TLoaderFileFormat = None,
Expand Down
40 changes: 29 additions & 11 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ class DestinationClientDwhConfiguration(DestinationClientConfiguration):
"""name of default schema to be used to name effective dataset to load data to"""
replace_strategy: TLoaderReplaceStrategy = "truncate-and-insert"
"""How to handle replace disposition for this destination, can be classic or staging"""
staging_dataset_name_layout: str = "%s_staging"
"""Layout for staging dataset, where %s is replaced with dataset name. placeholder is optional"""

def _bind_dataset_name(
self: TDestinationDwhClient, dataset_name: str, default_schema_name: str = None
Expand All @@ -201,21 +203,37 @@ def normalize_dataset_name(self, schema: Schema) -> str:

If default schema name is None or equals schema.name, the schema suffix is skipped.
"""
if not schema.name:
dataset_name = self._make_dataset_name(schema.name)
return (
dataset_name
if not dataset_name
else schema.naming.normalize_table_identifier(dataset_name)
)

def normalize_staging_dataset_name(self, schema: Schema) -> str:
"""Builds staging dataset name out of dataset_name and staging_dataset_name_layout."""
if "%s" in self.staging_dataset_name_layout:
# if dataset name is empty, staging dataset name is also empty
dataset_name = self._make_dataset_name(schema.name)
if not dataset_name:
Copy link
Collaborator

@sh-rp sh-rp Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this ever happen? I'm reading the code and trying to understand the mechanism, but in which case would the result of _make_dataset_name be None? Should't self.dataset_name always be set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - in vector databases you can work on collections without dataset prefix. they override configuration making that field optional. btw. we should support that as well in clickhouse or dremio where we do not have schemas as well

return dataset_name
# fill the placeholder
dataset_name = self.staging_dataset_name_layout % dataset_name
else:
# no placeholder, then layout is a full name. so you can have a single staging dataset
dataset_name = self.staging_dataset_name_layout

return schema.naming.normalize_table_identifier(dataset_name)

def _make_dataset_name(self, schema_name: str) -> str:
if not schema_name:
raise ValueError("schema_name is None or empty")

# if default schema is None then suffix is not added
if self.default_schema_name is not None and schema.name != self.default_schema_name:
# also normalize schema name. schema name is Python identifier and here convention may be different
return schema.naming.normalize_table_identifier(
(self.dataset_name or "") + "_" + schema.name
)
if self.default_schema_name is not None and schema_name != self.default_schema_name:
return (self.dataset_name or "") + "_" + schema_name

return (
self.dataset_name
if not self.dataset_name
else schema.naming.normalize_table_identifier(self.dataset_name)
)
return self.dataset_name


@configspec
Expand Down
12 changes: 9 additions & 3 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class AthenaMergeJob(SqlMergeJob):
@classmethod
def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str:
# reproducible name so we know which table to drop
with sql_client.with_staging_dataset(staging=True):
with sql_client.with_staging_dataset():
return sql_client.make_qualified_table_name(name_prefix)

@classmethod
Expand Down Expand Up @@ -224,10 +224,11 @@ class AthenaSQLClient(SqlClientBase[Connection]):
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
config: AthenaClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(None, dataset_name, capabilities)
super().__init__(None, dataset_name, staging_dataset_name, capabilities)
self._conn: Connection = None
self.config = config
self.credentials = config.credentials
Expand Down Expand Up @@ -381,7 +382,12 @@ def __init__(
table_needs_own_folder=True,
)

sql_client = AthenaSQLClient(config.normalize_dataset_name(schema), config, capabilities)
sql_client = AthenaSQLClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config,
capabilities,
)
super().__init__(schema, config, sql_client)
self.sql_client: AthenaSQLClient = sql_client # type: ignore
self.config: AthenaClientConfiguration = config
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def __init__(
) -> None:
sql_client = BigQuerySqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
config.get_location(),
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/bigquery/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class BigQueryClientConfiguration(DestinationClientDwhWithStagingConfiguration):
credentials: GcpServiceAccountCredentials = None
location: str = "US"
has_case_sensitive_identifiers: bool = True
"""If True then dlt expects to load data into case sensitive dataset"""
should_set_case_sensitivity_on_new_dataset: bool = False
"""If True, dlt will set case sensitivity flag on created datasets that corresponds to naming convention"""

http_timeout: float = 15.0 # connection timeout for http request to BigQuery api
file_upload_timeout: float = 30 * 60.0 # a timeout for file upload when loading local files
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ def adjust_capabilities(
naming: t.Optional[NamingConvention],
) -> DestinationCapabilitiesContext:
# modify the caps if case sensitive identifiers are requested
caps.has_case_sensitive_identifiers = config.has_case_sensitive_identifiers
if config.should_set_case_sensitivity_on_new_dataset:
caps.has_case_sensitive_identifiers = config.has_case_sensitive_identifiers
return super().adjust_capabilities(caps, config, naming)
3 changes: 2 additions & 1 deletion dlt/destinations/impl/bigquery/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class BigQuerySqlClient(SqlClientBase[bigquery.Client], DBTransaction):
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: GcpServiceAccountCredentialsWithoutDefaults,
capabilities: DestinationCapabilitiesContext,
location: str = "US",
Expand All @@ -88,7 +89,7 @@ def __init__(
self.credentials: GcpServiceAccountCredentialsWithoutDefaults = credentials
self.location = location
self.http_timeout = http_timeout
super().__init__(credentials.project_id, dataset_name, capabilities)
super().__init__(credentials.project_id, dataset_name, staging_dataset_name, capabilities)

self._default_retry = bigquery.DEFAULT_RETRY.with_deadline(retry_deadline)
self._default_query = bigquery.QueryJobConfig(
Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ def __init__(
capabilities: DestinationCapabilitiesContext,
) -> None:
self.sql_client: ClickHouseSqlClient = ClickHouseSqlClient(
config.normalize_dataset_name(schema), config.credentials, capabilities
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
super().__init__(schema, config, self.sql_client)
self.config: ClickHouseClientConfiguration = config
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ class ClickHouseSqlClient(
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: ClickHouseCredentials,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(credentials.database, dataset_name, capabilities)
super().__init__(credentials.database, dataset_name, staging_dataset_name, capabilities)
self._conn: clickhouse_driver.dbapi.connection = None
self.credentials = credentials
self.database_name = credentials.database
Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,10 @@ def __init__(
capabilities: DestinationCapabilitiesContext,
) -> None:
sql_client = DatabricksSqlClient(
config.normalize_dataset_name(schema), config.credentials, capabilities
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
super().__init__(schema, config, sql_client)
self.config: DatabricksClientConfiguration = config
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ class DatabricksSqlClient(SqlClientBase[DatabricksSqlConnection], DBTransaction)
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: DatabricksCredentials,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(credentials.catalog, dataset_name, capabilities)
super().__init__(credentials.catalog, dataset_name, staging_dataset_name, capabilities)
self._conn: DatabricksSqlConnection = None
self.credentials = credentials

Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ def __init__(
capabilities: DestinationCapabilitiesContext,
) -> None:
sql_client = DremioSqlClient(
config.normalize_dataset_name(schema), config.credentials, capabilities
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
super().__init__(schema, config, sql_client)
self.config: DremioClientConfiguration = config
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/dremio/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ class DremioSqlClient(SqlClientBase[pydremio.DremioConnection]):
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: DremioCredentials,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(credentials.database, dataset_name, capabilities)
super().__init__(credentials.database, dataset_name, staging_dataset_name, capabilities)
self._conn: Optional[pydremio.DremioConnection] = None
self.credentials = credentials

Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/impl/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ def __init__(
capabilities: DestinationCapabilitiesContext,
) -> None:
sql_client = DuckDbSqlClient(
config.normalize_dataset_name(schema), config.credentials, capabilities
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
super().__init__(schema, config, sql_client)
self.config: DuckDbClientConfiguration = config
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ class DuckDbSqlClient(SqlClientBase[duckdb.DuckDBPyConnection], DBTransaction):
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: DuckDbBaseCredentials,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(None, dataset_name, capabilities)
super().__init__(None, dataset_name, staging_dataset_name, capabilities)
self._conn: duckdb.DuckDBPyConnection = None
self.credentials = credentials

Expand Down
6 changes: 5 additions & 1 deletion dlt/destinations/impl/lancedb/lancedb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,11 @@ def add_table_fields(
def _execute_schema_update(self, only_tables: Iterable[str]) -> None:
for table_name in only_tables or self.schema.tables:
exists, existing_columns = self.get_storage_table(table_name)
new_columns = self.schema.get_new_table_columns(table_name, existing_columns)
new_columns = self.schema.get_new_table_columns(
table_name,
existing_columns,
self.capabilities.generates_case_sensitive_identifiers(),
)
embedding_fields: List[str] = get_columns_names_with_prop(
self.schema.get_table(table_name), VECTORIZE_HINT
)
Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/impl/motherduck/motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ def __init__(
) -> None:
super().__init__(schema, config, capabilities) # type: ignore
sql_client = MotherDuckSqlClient(
config.normalize_dataset_name(schema), config.credentials, capabilities
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
self.config: MotherDuckClientConfiguration = config # type: ignore
self.sql_client: MotherDuckSqlClient = sql_client
3 changes: 2 additions & 1 deletion dlt/destinations/impl/motherduck/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ class MotherDuckSqlClient(DuckDbSqlClient):
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: MotherDuckCredentials,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(dataset_name, credentials, capabilities)
super().__init__(dataset_name, staging_dataset_name, credentials, capabilities)
self.database_name = credentials.database

def catalog_name(self, escape: bool = True) -> Optional[str]:
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def generate_sql(
) -> List[str]:
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset(staging=True):
with sql_client.with_staging_dataset():
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
# drop destination table
Expand Down Expand Up @@ -149,7 +149,10 @@ def __init__(
capabilities: DestinationCapabilitiesContext,
) -> None:
sql_client = PyOdbcMsSqlClient(
config.normalize_dataset_name(schema), config.credentials, capabilities
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
super().__init__(schema, config, sql_client)
self.config: MsSqlClientConfiguration = config
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/mssql/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ class PyOdbcMsSqlClient(SqlClientBase[pyodbc.Connection], DBTransaction):
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: MsSqlCredentials,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(credentials.database, dataset_name, capabilities)
super().__init__(credentials.database, dataset_name, staging_dataset_name, capabilities)
self._conn: pyodbc.Connection = None
self.credentials = credentials

Expand Down
1 change: 0 additions & 1 deletion dlt/destinations/impl/postgres/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.casefold_identifier = str.lower
caps.has_case_sensitive_identifiers = True
caps.escape_literal = escape_postgres_literal
caps.has_case_sensitive_identifiers = True
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (2 * EVM_DECIMAL_PRECISION, EVM_DECIMAL_PRECISION)
caps.max_identifier_length = 63
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def generate_sql(
) -> List[str]:
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset(staging=True):
with sql_client.with_staging_dataset():
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
# drop destination table
Expand Down Expand Up @@ -211,7 +211,10 @@ def __init__(
capabilities: DestinationCapabilitiesContext,
) -> None:
sql_client = Psycopg2SqlClient(
config.normalize_dataset_name(schema), config.credentials, capabilities
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
super().__init__(schema, config, sql_client)
self.config: PostgresClientConfiguration = config
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/postgres/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ class Psycopg2SqlClient(SqlClientBase["psycopg2.connection"], DBTransaction):
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: PostgresCredentials,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(credentials.database, dataset_name, capabilities)
super().__init__(credentials.database, dataset_name, staging_dataset_name, capabilities)
self._conn: psycopg2.connection = None
self.credentials = credentials

Expand Down
Loading