Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset synapse branch w/ devel #725

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_airflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

jobs:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_build_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

jobs:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena_iceberg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_snowflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
20 changes: 7 additions & 13 deletions .github/workflows/test_destination_synapse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ on:
branches:
- master
- devel

workflow_dispatch:

env:
DESTINATION__SYNAPSE__CREDENTIALS: ${{ secrets.SYNAPSE_CREDENTIALS }}
DESTINATION__SYNAPSE__CREDENTIALS__PASSWORD: ${{ secrets.SYNAPSE_PASSWORD }}
Expand All @@ -19,19 +18,14 @@ env:
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:

build:
runs-on: ubuntu-latest

steps:
- name: Check source branch name
run: |
if [[ "${{ github.head_ref }}" != "synapse" ]]; then
exit 1
fi
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: Tests Synapse loader
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
Expand All @@ -42,7 +36,7 @@ jobs:
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_weaviate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
pull_request:
branches:
- master
- devel
#- devel
workflow_dispatch:

env:
Expand Down
36 changes: 36 additions & 0 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,42 @@ def escape_mssql_literal(v: Any) -> Any:
return str(v)


# TODO needs improvement for SQL injection, combine with mssql handling
def escape_synapse_literal(v: Any) -> Any:
if isinstance(v, str):
# Use the _escape_extended function to escape the string
return _escape_extended(v, prefix="N'", escape_dict=SYNAPSE_ESCAPE_DICT)
if isinstance(v, (datetime, date, time)):
return f"'{v.isoformat()}'"
if isinstance(v, (list, dict)):
# Serialize the list or dict to JSON and then escape it
return _escape_extended(json.dumps(v), prefix="N'", escape_dict=SYNAPSE_ESCAPE_DICT)
if isinstance(v, bytes):
hex_string = v.hex()
return f"0x{hex_string}"
if isinstance(v, bool):
return str(int(v))
if v is None:
return "NULL"
return str(v)


# TODO potentially combine with mssql
SYNAPSE_ESCAPE_DICT = {
"'": "''",
'\n': "' + CHAR(10) + N'",
'\r': "' + CHAR(13) + N'",
'\t': "' + CHAR(9) + N'",
"\\": "\\",
}

SYNAPSE_SQL_ESCAPE_RE = _make_sql_escape_re(SYNAPSE_ESCAPE_DICT)


def escape_synapse_identifier(v: str) -> str:
return '"' + v.replace('"', '') + '"'


def escape_redshift_identifier(v: str) -> str:
return '"' + v.replace('"', '""').replace("\\", "\\\\") + '"'

Expand Down
52 changes: 52 additions & 0 deletions dlt/destinations/synapse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Type

from dlt.common.schema.schema import Schema
from dlt.common.configuration import with_config, known_sections
from dlt.common.configuration.accessors import config
from dlt.common.data_writers.escape import escape_postgres_identifier, escape_synapse_literal, escape_synapse_identifier, escape_postgres_identifier, escape_mssql_literal
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.wei import EVM_DECIMAL_PRECISION

from dlt.destinations.synapse.configuration import SynapseClientConfiguration


@with_config(spec=SynapseClientConfiguration, sections=(known_sections.DESTINATION, "synapse",))
def _configure(config: SynapseClientConfiguration = config.value) -> SynapseClientConfiguration:
return config


def capabilities() -> DestinationCapabilitiesContext:
# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-service-capacity-limits
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "insert_values"
caps.supported_loader_file_formats = ["insert_values"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet"]
caps.escape_identifier = escape_synapse_identifier
caps.escape_literal = escape_synapse_literal
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0)
# https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN
caps.max_identifier_length = 128
caps.max_column_identifier_length = 128
caps.max_query_length = 4 * 1024 * 64 * 1024
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 2 ** 30 - 1
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False
caps.max_rows_per_insert = 1000

return caps


def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase:
# import client when creating instance so capabilities and config specs can be accessed without dependencies installed
from dlt.destinations.synapse.synapse import SynapseClient

return SynapseClient(schema, _configure(initial_config)) # type: ignore[arg-type]


def spec() -> Type[DestinationClientConfiguration]:
return SynapseClientConfiguration
89 changes: 89 additions & 0 deletions dlt/destinations/synapse/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Final, ClassVar, Any, List, Optional
from sqlalchemy.engine import URL

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.utils import digest128
from dlt.common.typing import TSecretValue
from dlt.common.exceptions import SystemConfigurationException

from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration


@configspec
class SynapseCredentials(ConnectionStringCredentials):
drivername: Final[str] = "sqlserver" # type: ignore
user: Final[str] = "dltadmin"
password: TSecretValue
host: str
port: int = 1433
connect_timeout: int = 15
odbc_driver: str = "ODBC Driver 17 for SQL Server"

__config_gen_annotations__: ClassVar[List[str]] = ["port", "connect_timeout"]

def parse_native_representation(self, native_value: Any) -> None:
# TODO: Support ODBC connection string or sqlalchemy URL
super().parse_native_representation(native_value)
self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout))
if not self.is_partial():
self.resolve()

def on_resolved(self) -> None:
self.database = self.database.lower()

def to_url(self) -> URL:
url = super().to_url()
url.update_query_pairs([("connect_timeout", str(self.connect_timeout))])
return url

def on_partial(self) -> None:
self.odbc_driver = self._get_odbc_driver()
if not self.is_partial():
self.resolve()

def _get_odbc_driver(self) -> str:
if self.odbc_driver:
return self.odbc_driver
# Pick a default driver if available
supported_drivers = ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server']
import pyodbc
available_drivers = pyodbc.drivers()
for driver in supported_drivers:
if driver in available_drivers:
return driver
docs_url = "https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server?view=sql-server-ver16"
raise SystemConfigurationException(
f"No supported ODBC driver found for MS SQL Server. "
f"See {docs_url} for information on how to install the '{supported_drivers[0]}' on your platform."
)

def to_odbc_dsn(self) -> str:
params = {
"DRIVER": self.odbc_driver,
"SERVER": self.host,
"PORT": self.port,
"DATABASE": self.database,
"UID": self.username,
"PWD": self.password,
"LongAsMax": "yes",
"MARS_Connection": "yes"
}
if self.query:
params.update(self.query)
return ";".join([f"{k}={v}" for k, v in params.items()])



@configspec
class SynapseClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_name: Final[str] = "synapse" # type: ignore
credentials: SynapseCredentials

create_indexes: bool = False

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
Loading