Skip to content

Commit

Permalink
Synapse destination initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Sandbrink committed Jan 18, 2024
1 parent 77b070d commit 7bc2163
Show file tree
Hide file tree
Showing 28 changed files with 672 additions and 72 deletions.
22 changes: 8 additions & 14 deletions .github/workflows/test_destination_synapse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ on:
branches:
- master
- devel

workflow_dispatch:

env:
Expand All @@ -18,19 +17,14 @@ env:
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:

build:
runs-on: ubuntu-latest

steps:
- name: Check source branch name
run: |
if [[ "${{ github.head_ref }}" != "synapse" ]]; then
exit 1
fi
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: Tests Synapse loader
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -69,17 +63,17 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E synapse -E s3 -E gs -E az --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E synapse -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
Expand Down
10 changes: 8 additions & 2 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ def escape_mssql_literal(v: Any) -> Any:
json.dumps(v), prefix="N'", escape_dict=MS_SQL_ESCAPE_DICT, escape_re=MS_SQL_ESCAPE_RE
)
if isinstance(v, bytes):
base_64_string = base64.b64encode(v).decode("ascii")
return f"""CAST('' AS XML).value('xs:base64Binary("{base_64_string}")', 'VARBINARY(MAX)')"""
# 8000 is the max value for n in VARBINARY(n)
# https://learn.microsoft.com/en-us/sql/t-sql/data-types/binary-and-varbinary-transact-sql
if len(v) <= 8000:
n = len(v)
else:
n = "MAX"
return f"CONVERT(VARBINARY({n}), '{v.hex()}', 2)"

if isinstance(v, bool):
return str(int(v))
if v is None:
Expand Down
24 changes: 17 additions & 7 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,29 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
# do not write INSERT INTO command, this must be added together with table name by the loader
self._f.write("INSERT INTO {}(")
self._f.write(",".join(map(self._caps.escape_identifier, headers)))
self._f.write(")\nVALUES\n")
if self._caps.insert_values_writer_type == "default":
self._f.write(")\nVALUES\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write(")\n")

def write_data(self, rows: Sequence[Any]) -> None:
super().write_data(rows)

def write_row(row: StrAny) -> None:
def write_row(row: StrAny, last_row: bool = False) -> None:
output = ["NULL"] * len(self._headers_lookup)
for n, v in row.items():
output[self._headers_lookup[n]] = self._caps.escape_literal(v)
self._f.write("(")
self._f.write(",".join(output))
self._f.write(")")
if self._caps.insert_values_writer_type == "default":
self._f.write("(")
self._f.write(",".join(output))
self._f.write(")")
if not last_row:
self._f.write(",\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write("SELECT ")
self._f.write(",".join(output))
if not last_row:
self._f.write("\nUNION ALL\n")

# if next chunk add separator
if self._chunks_written > 0:
Expand All @@ -195,10 +206,9 @@ def write_row(row: StrAny) -> None:
# write rows
for row in rows[:-1]:
write_row(row)
self._f.write(",\n")

# write last row without separator so we can write footer eventually
write_row(rows[-1])
write_row(rows[-1], last_row=True)
self._chunks_written += 1

def write_footer(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
schema_supports_numeric_precision: bool = True
timestamp_precision: int = 6
max_rows_per_insert: Optional[int] = None
insert_values_writer_type: str = "default"

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dlt.destinations.impl.qdrant.factory import qdrant
from dlt.destinations.impl.motherduck.factory import motherduck
from dlt.destinations.impl.weaviate.factory import weaviate
from dlt.destinations.impl.synapse.factory import synapse


__all__ = [
Expand All @@ -25,4 +26,5 @@
"qdrant",
"motherduck",
"weaviate",
"synapse",
]
31 changes: 19 additions & 12 deletions dlt/destinations/impl/mssql/configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Final, ClassVar, Any, List, Optional, TYPE_CHECKING
from typing import Final, ClassVar, Any, List, Dict, Optional, TYPE_CHECKING
from sqlalchemy.engine import URL

from dlt.common.configuration import configspec
Expand All @@ -10,9 +10,6 @@
from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration


SUPPORTED_DRIVERS = ["ODBC Driver 18 for SQL Server", "ODBC Driver 17 for SQL Server"]


@configspec
class MsSqlCredentials(ConnectionStringCredentials):
drivername: Final[str] = "mssql" # type: ignore
Expand All @@ -24,22 +21,27 @@ class MsSqlCredentials(ConnectionStringCredentials):

__config_gen_annotations__: ClassVar[List[str]] = ["port", "connect_timeout"]

SUPPORTED_DRIVERS: ClassVar[List[str]] = [
"ODBC Driver 18 for SQL Server",
"ODBC Driver 17 for SQL Server",
]

def parse_native_representation(self, native_value: Any) -> None:
# TODO: Support ODBC connection string or sqlalchemy URL
super().parse_native_representation(native_value)
if self.query is not None:
self.query = {k.lower(): v for k, v in self.query.items()} # Make case-insensitive.
if "driver" in self.query and self.query.get("driver") not in SUPPORTED_DRIVERS:
raise SystemConfigurationException(
f"""The specified driver "{self.query.get('driver')}" is not supported."""
f" Choose one of the supported drivers: {', '.join(SUPPORTED_DRIVERS)}."
)
self.driver = self.query.get("driver", self.driver)
self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout))
if not self.is_partial():
self.resolve()

def on_resolved(self) -> None:
if self.driver not in self.SUPPORTED_DRIVERS:
raise SystemConfigurationException(
f"""The specified driver "{self.driver}" is not supported."""
f" Choose one of the supported drivers: {', '.join(self.SUPPORTED_DRIVERS)}."
)
self.database = self.database.lower()

def to_url(self) -> URL:
Expand All @@ -55,20 +57,21 @@ def on_partial(self) -> None:
def _get_driver(self) -> str:
if self.driver:
return self.driver

# Pick a default driver if available
import pyodbc

available_drivers = pyodbc.drivers()
for d in SUPPORTED_DRIVERS:
for d in self.SUPPORTED_DRIVERS:
if d in available_drivers:
return d
docs_url = "https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server?view=sql-server-ver16"
raise SystemConfigurationException(
f"No supported ODBC driver found for MS SQL Server. See {docs_url} for information on"
f" how to install the '{SUPPORTED_DRIVERS[0]}' on your platform."
f" how to install the '{self.SUPPORTED_DRIVERS[0]}' on your platform."
)

def to_odbc_dsn(self) -> str:
def _get_odbc_dsn_dict(self) -> Dict[str, Any]:
params = {
"DRIVER": self.driver,
"SERVER": f"{self.host},{self.port}",
Expand All @@ -78,6 +81,10 @@ def to_odbc_dsn(self) -> str:
}
if self.query is not None:
params.update({k.upper(): v for k, v in self.query.items()})
return params

def to_odbc_dsn(self) -> str:
params = self._get_odbc_dsn_dict()
return ";".join([f"{k}={v}" for k, v in params.items()])


Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/mssql/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def drop_dataset(self) -> None:
)
table_names = [row[0] for row in rows]
self.drop_tables(*table_names)

self.execute_sql("DROP SCHEMA IF EXISTS %s;" % self.fully_qualified_dataset_name())
# Drop schema
self._drop_schema()

def _drop_views(self, *tables: str) -> None:
if not tables:
Expand All @@ -117,6 +117,9 @@ def _drop_views(self, *tables: str) -> None:
]
self.execute_fragments(statements)

def _drop_schema(self) -> None:
self.execute_sql("DROP SCHEMA IF EXISTS %s;" % self.fully_qualified_dataset_name())

def execute_sql(
self, sql: AnyStr, *args: Any, **kwargs: Any
) -> Optional[Sequence[Sequence[Any]]]:
Expand Down
58 changes: 58 additions & 0 deletions dlt/destinations/impl/synapse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Set up loader user
Execute the following SQL statements to set up the [loader](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql/data-loading-best-practices#create-a-loading-user) user:
```sql
-- on master database

CREATE LOGIN loader WITH PASSWORD = 'YOUR_LOADER_PASSWORD_HERE';
```

```sql
-- on minipool database

CREATE USER loader FOR LOGIN loader;

-- DDL permissions
GRANT CREATE TABLE ON DATABASE :: minipool TO loader;
GRANT CREATE VIEW ON DATABASE :: minipool TO loader;

-- DML permissions
GRANT SELECT ON DATABASE :: minipool TO loader;
GRANT INSERT ON DATABASE :: minipool TO loader;
GRANT ADMINISTER DATABASE BULK OPERATIONS TO loader;
```

```sql
-- https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-workload-isolation

CREATE WORKLOAD GROUP DataLoads
WITH (
MIN_PERCENTAGE_RESOURCE = 0
,CAP_PERCENTAGE_RESOURCE = 50
,REQUEST_MIN_RESOURCE_GRANT_PERCENT = 25
);

CREATE WORKLOAD CLASSIFIER [wgcELTLogin]
WITH (
WORKLOAD_GROUP = 'DataLoads'
,MEMBERNAME = 'loader'
);
```

# config.toml
```toml
[destination.synapse.credentials]
database = "minipool"
username = "loader"
host = "dlt-synapse-ci.sql.azuresynapse.net"
port = 1433
driver = "ODBC Driver 18 for SQL Server"

[destination.synapse]
create_indexes = false
```

# secrets.toml
```toml
[destination.synapse.credentials]
password = "YOUR_LOADER_PASSWORD_HERE"
```
46 changes: 46 additions & 0 deletions dlt/destinations/impl/synapse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from dlt.common.data_writers.escape import escape_postgres_identifier, escape_mssql_literal
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.wei import EVM_DECIMAL_PRECISION


def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()

caps.preferred_loader_file_format = "insert_values"
caps.supported_loader_file_formats = ["insert_values"]
caps.preferred_staging_file_format = None
caps.supported_staging_file_formats = []

caps.insert_values_writer_type = "select_union" # https://stackoverflow.com/a/77014299

caps.escape_identifier = escape_postgres_identifier
caps.escape_literal = escape_mssql_literal

# Synapse has a max precision of 38
# https://learn.microsoft.com/en-us/sql/t-sql/statements/create-table-azure-sql-data-warehouse?view=aps-pdw-2016-au7#DataTypes
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0)

# https://learn.microsoft.com/en-us/sql/t-sql/statements/create-table-azure-sql-data-warehouse?view=aps-pdw-2016-au7#LimitationsRestrictions
caps.max_identifier_length = 128
caps.max_column_identifier_length = 128

# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-service-capacity-limits#queries
caps.max_query_length = 65536 * 4096
caps.is_max_query_length_in_bytes = True

# nvarchar(max) can store 2 GB
# https://learn.microsoft.com/en-us/sql/t-sql/data-types/nchar-and-nvarchar-transact-sql?view=sql-server-ver16#nvarchar---n--max--
caps.max_text_data_type_length = 2 * 1024 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = True

# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-develop-transactions
caps.supports_transactions = True
caps.supports_ddl_transactions = False

# datetimeoffset can store 7 digits for fractional seconds
# https://learn.microsoft.com/en-us/sql/t-sql/data-types/datetimeoffset-transact-sql?view=sql-server-ver16
caps.timestamp_precision = 7

return caps
38 changes: 38 additions & 0 deletions dlt/destinations/impl/synapse/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Final, Any, List, Dict, Optional, ClassVar

from dlt.common.configuration import configspec

from dlt.destinations.impl.mssql.configuration import (
MsSqlCredentials,
MsSqlClientConfiguration,
)
from dlt.destinations.impl.mssql.configuration import MsSqlCredentials


@configspec
class SynapseCredentials(MsSqlCredentials):
drivername: Final[str] = "synapse" # type: ignore

# LongAsMax keyword got introduced in ODBC Driver 18 for SQL Server.
SUPPORTED_DRIVERS: ClassVar[List[str]] = ["ODBC Driver 18 for SQL Server"]

def _get_odbc_dsn_dict(self) -> Dict[str, Any]:
params = super()._get_odbc_dsn_dict()
# Long types (text, ntext, image) are not supported on Synapse.
# Convert to max types using LongAsMax keyword.
# https://stackoverflow.com/a/57926224
params["LONGASMAX"] = "yes"
return params


@configspec
class SynapseClientConfiguration(MsSqlClientConfiguration):
destination_type: Final[str] = "synapse" # type: ignore
credentials: SynapseCredentials

# Determines if `primary_key` and `unique` column hints are applied.
# Set to False by default because the PRIMARY KEY and UNIQUE constraints
# are tricky in Synapse: they are NOT ENFORCED and can lead to innacurate
# results if the user does not ensure all column values are unique.
# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-table-constraints
create_indexes: bool = False
Loading

0 comments on commit 7bc2163

Please sign in to comment.