Skip to content

Commit

Permalink
rewrite naive code to prevent IndexError
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Sandbrink committed Jan 23, 2024
1 parent 5891a9a commit 75be2ce
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
14 changes: 9 additions & 5 deletions dlt/destinations/impl/synapse/configuration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Final, Any, List, Dict, Optional, ClassVar

from dlt.common.configuration import configspec
from dlt.common.schema.typing import TTableIndexType, TWriteDisposition
from dlt.common import logger
from dlt.common.configuration import configspec
from dlt.common.schema.typing import TTableIndexType, TSchemaTables
from dlt.common.schema.utils import get_write_disposition

from dlt.destinations.impl.mssql.configuration import (
MsSqlCredentials,
Expand Down Expand Up @@ -60,13 +61,16 @@ class SynapseClientConfiguration(MsSqlClientConfiguration):
"auto_disable_concurrency",
]

def get_load_workers(self, write_disposition: TWriteDisposition, workers: int) -> int:
def get_load_workers(self, tables: TSchemaTables, workers: int) -> int:
"""Returns the adjusted number of load workers to prevent concurrency issues."""

write_dispositions = [get_write_disposition(tables, table_name) for table_name in tables]
n_replace_dispositions = len([d for d in write_dispositions if d == "replace"])
if (
write_disposition == "replace"
n_replace_dispositions > 1
and self.replace_strategy == "staging-optimized"
and workers > 1
):
print("auto_disable_concurrency:", self.auto_disable_concurrency)
warning_msg_shared = (
'Data is being loaded into Synapse with write disposition "replace"'
' and replace strategy "staging-optimized", while the number of'
Expand Down
7 changes: 2 additions & 5 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
TAnySchemaColumns,
TSchemaContract,
)
from dlt.common.schema.utils import normalize_schema_name, get_write_disposition
from dlt.common.schema.utils import normalize_schema_name
from dlt.common.storages.exceptions import LoadPackageNotFound
from dlt.common.typing import DictStrStr, TFun, TSecretValue, is_optional_type
from dlt.common.runners import pool_runner as runner
Expand Down Expand Up @@ -485,10 +485,7 @@ def load(

# for synapse we might need to adjust the number of load workers
if self.destination.destination_name == "synapse":
write_disposition = get_write_disposition(
self.default_schema.tables, self.default_schema.data_table_names()[0]
)
workers = client.config.get_load_workers(write_disposition, workers) # type: ignore[attr-defined]
workers = client.config.get_load_workers(self.default_schema.tables, workers) # type: ignore[attr-defined]

# create default loader config and the loader
load_config = LoaderConfiguration(
Expand Down

0 comments on commit 75be2ce

Please sign in to comment.