Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synapse #708

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
93723b4
Synapse init file, leveraging 'escape_mssql_literal'
eryanRM Oct 24, 2023
44929b1
sql_client: passing most tests
eryanRM Oct 24, 2023
1271547
synapse setup
eryanRM Oct 24, 2023
be41558
update escape character, text mapping handling
eryanRM Oct 24, 2023
b27008f
updating file_format param for test utils destinations_config
eryanRM Oct 24, 2023
ad3eee8
migrating synapse handling from old branch
eryanRM Oct 24, 2023
1ad41b3
updated test_execute_df to have synapse insert handling -- can update…
eryanRM Oct 24, 2023
0c3f132
Devel sync pt 1: aws creds, config, conftest, test_buffered_writer, t…
eryanRM Oct 24, 2023
db08468
Devel sync pt 2: dlt/common
eryanRM Oct 24, 2023
0a5d4b3
Devel sync pt 3: tests/load, dlt/extract, dlt/destinations/filesystem
eryanRM Oct 24, 2023
d9627d4
Devel sync pt 4: .md files / docs
eryanRM Oct 25, 2023
b4067d7
Devel sync pt 5: tests
eryanRM Oct 25, 2023
84e46a6
Devel sync pt 6: dlt/extract
eryanRM Oct 25, 2023
a62fef5
synapse/sql_client: remove logging, revert tx handling, revert execut…
eryanRM Oct 25, 2023
c0661e7
migrated logic into synapse.py and updated sql_client.py to mirror mssql
eryanRM Oct 26, 2023
d48349d
removing print statements
eryanRM Oct 26, 2023
be05abb
generate_insert_query cleanup (SQL injection)
eryanRM Oct 26, 2023
77ebcda
cleaned logging and print statements for easier review
eryanRM Oct 27, 2023
b4a7798
devel sync
eryanRM Oct 27, 2023
754541b
updated handling for '\n' from pyodbc and () for row tuples
eryanRM Oct 29, 2023
b65053e
reverting insert_job_client logic (confirm Synapse handling)
eryanRM Oct 29, 2023
2adc3b2
passing assertion errors and correctly inserting rows
eryanRM Oct 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dlt/destinations/synapse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# loader account setup

1. Create new database `CREATE DATABASE dlt_data`
2. Create new user, set password `CREATE USER loader WITH PASSWORD 'loader';`
3. Set as database owner (we could set lower permission) `ALTER DATABASE dlt_data OWNER TO loader`
51 changes: 51 additions & 0 deletions dlt/destinations/synapse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import Type

from dlt.common.schema.schema import Schema
from dlt.common.configuration import with_config, known_sections
from dlt.common.configuration.accessors import config
from dlt.common.data_writers.escape import escape_postgres_identifier, escape_mssql_literal
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.wei import EVM_DECIMAL_PRECISION

from dlt.destinations.synapse.configuration import SynapseClientConfiguration


@with_config(spec=SynapseClientConfiguration, sections=(known_sections.DESTINATION, "synapse",))
def _configure(config: SynapseClientConfiguration = config.value) -> SynapseClientConfiguration:
return config


def capabilities() -> DestinationCapabilitiesContext:
# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-service-capacity-limits
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "insert_values"
caps.supported_loader_file_formats = ["insert_values"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet"]
caps.escape_identifier = escape_postgres_identifier
caps.escape_literal = escape_mssql_literal
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0)
caps.max_identifier_length = 128
caps.max_column_identifier_length = 128
caps.max_query_length = 4 * 1024 * 64 * 1024
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 4000
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False
caps.max_rows_per_insert = 1000

return caps


def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase:
# import client when creating instance so capabilities and config specs can be accessed without dependencies installed
from dlt.destinations.synapse.synapse import SynapseClient

return SynapseClient(schema, _configure(initial_config)) # type: ignore[arg-type]


def spec() -> Type[DestinationClientConfiguration]:
return SynapseClientConfiguration
89 changes: 89 additions & 0 deletions dlt/destinations/synapse/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Final, ClassVar, Any, List, Optional
from sqlalchemy.engine import URL

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.utils import digest128
from dlt.common.typing import TSecretValue
from dlt.common.exceptions import SystemConfigurationException

from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration


@configspec
class SynapseCredentials(ConnectionStringCredentials):
drivername: Final[str] = "sqlserver" # type: ignore
user: Final[str] = "dltadmin"
password: TSecretValue
host: str
port: int = 1433
connect_timeout: int = 15
odbc_driver: str = "ODBC Driver 17 for SQL Server"

__config_gen_annotations__: ClassVar[List[str]] = ["port", "connect_timeout"]

def parse_native_representation(self, native_value: Any) -> None:
# TODO: Support ODBC connection string or sqlalchemy URL
super().parse_native_representation(native_value)
self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout))
if not self.is_partial():
self.resolve()

def on_resolved(self) -> None:
self.database = self.database.lower()

def to_url(self) -> URL:
url = super().to_url()
url.update_query_pairs([("connect_timeout", str(self.connect_timeout))])
return url

def on_partial(self) -> None:
self.odbc_driver = self._get_odbc_driver()
if not self.is_partial():
self.resolve()

def _get_odbc_driver(self) -> str:
if self.odbc_driver:
return self.odbc_driver
# Pick a default driver if available
supported_drivers = ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server']
import pyodbc
available_drivers = pyodbc.drivers()
for driver in supported_drivers:
if driver in available_drivers:
return driver
docs_url = "https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server?view=sql-server-ver16"
raise SystemConfigurationException(
f"No supported ODBC driver found for MS SQL Server. "
f"See {docs_url} for information on how to install the '{supported_drivers[0]}' on your platform."
)

def to_odbc_dsn(self) -> str:
params = {
"DRIVER": self.odbc_driver,
"SERVER": self.host,
"PORT": self.port,
"DATABASE": self.database,
"UID": self.username,
"PWD": self.password,
"LongAsMax": "yes",
"MARS_Connection": "yes"
}
if self.query:
params.update(self.query)
return ";".join([f"{k}={v}" for k, v in params.items()])



@configspec
class SynapseClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_name: Final[str] = "synapse" # type: ignore
credentials: SynapseCredentials

create_indexes: bool = False

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
Loading
Loading