Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synapse #708

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
93723b4
Synapse init file, leveraging 'escape_mssql_literal'
eryanRM Oct 24, 2023
44929b1
sql_client: passing most tests
eryanRM Oct 24, 2023
1271547
synapse setup
eryanRM Oct 24, 2023
be41558
update escape character, text mapping handling
eryanRM Oct 24, 2023
b27008f
updating file_format param for test utils destinations_config
eryanRM Oct 24, 2023
ad3eee8
migrating synapse handling from old branch
eryanRM Oct 24, 2023
1ad41b3
updated test_execute_df to have synapse insert handling -- can update…
eryanRM Oct 24, 2023
0c3f132
Devel sync pt 1: aws creds, config, conftest, test_buffered_writer, t…
eryanRM Oct 24, 2023
db08468
Devel sync pt 2: dlt/common
eryanRM Oct 24, 2023
0a5d4b3
Devel sync pt 3: tests/load, dlt/extract, dlt/destinations/filesystem
eryanRM Oct 24, 2023
d9627d4
Devel sync pt 4: .md files / docs
eryanRM Oct 25, 2023
b4067d7
Devel sync pt 5: tests
eryanRM Oct 25, 2023
84e46a6
Devel sync pt 6: dlt/extract
eryanRM Oct 25, 2023
a62fef5
synapse/sql_client: remove logging, revert tx handling, revert execut…
eryanRM Oct 25, 2023
c0661e7
migrated logic into synapse.py and updated sql_client.py to mirror mssql
eryanRM Oct 26, 2023
d48349d
removing print statements
eryanRM Oct 26, 2023
be05abb
generate_insert_query cleanup (SQL injection)
eryanRM Oct 26, 2023
77ebcda
cleaned logging and print statements for easier review
eryanRM Oct 27, 2023
b4a7798
devel sync
eryanRM Oct 27, 2023
754541b
updated handling for '\n' from pyodbc and () for row tuples
eryanRM Oct 29, 2023
b65053e
reverting insert_job_client logic (confirm Synapse handling)
eryanRM Oct 29, 2023
2adc3b2
passing assertion errors and correctly inserting rows
eryanRM Oct 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ body:
attributes:
value: |
Thanks for reporting a bug for dlt! Please fill out the sections below.
If you are not sure if this is a bug or not, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g)
If you are not sure if this is a bug or not, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA)
and ask in the #3-technical-help channel.
- type: input
attributes:
Expand All @@ -34,7 +34,7 @@ body:
attributes:
label: Steps to reproduce
description: >
How can we replicate the issue? If it's not straightforward to reproduce, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g)
How can we replicate the issue? If it's not straightforward to reproduce, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA)
and ask in the #3-technical-help channel.
placeholder: >
Provide a step-by-step description of how to reproduce the problem you are running into.
Expand Down
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
blank_issues_enabled: true
contact_links:
- name: Ask a question or get support on dlt Slack
url: https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g
url: https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA
about: Need help or support? Join our dlt community on Slack and get assistance.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/feature_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ body:
attributes:
value: |
Thanks for suggesting a feature for dlt!
If you like to discuss your idea first, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g)
If you like to discuss your idea first, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA)
and pose your questions in the #3-technical-help channel.
For minor features and improvements, feel free to open a [pull request](https://github.com/dlt-hub/dlt/pulls) directly.
- type: textarea
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate --with docs --without airflow
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet --with docs --without airflow

- name: Run linter and tests
run: make test-and-lint-snippets
Expand Down
10 changes: 10 additions & 0 deletions dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from typing import Any, Mapping, Type, Tuple, NamedTuple, Sequence

from dlt.common.exceptions import DltException, TerminalException
from dlt.common.utils import main_module_file_path


class LookupTrace(NamedTuple):
Expand Down Expand Up @@ -48,6 +50,14 @@ def __str__(self) -> str:
msg += f'\tfor field "{f}" config providers and keys were tried in following order:\n'
for tr in field_traces:
msg += f'\t\tIn {tr.provider} key {tr.key} was not found.\n'
# check if entry point is run with path. this is common problem so warn the user
main_path = main_module_file_path()
main_dir = os.path.dirname(main_path)
abs_main_dir = os.path.abspath(main_dir)
if abs_main_dir != os.getcwd():
# directory was specified
msg += "WARNING: dlt looks for .dlt folder in your current working directory and your cwd (%s) is different from directory of your pipeline script (%s).\n" % (os.getcwd(), abs_main_dir)
msg += "If you keep your secret files in the same folder as your pipeline script but run your script from some other folder, secrets/configs will not be found\n"
msg += "Please refer to https://dlthub.com/docs/general-usage/credentials for more information\n"
return msg

Expand Down
7 changes: 5 additions & 2 deletions dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional, Dict, Any

from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TSecretStrValue
from dlt.common.typing import TSecretStrValue, DictStrAny
from dlt.common.configuration.specs import CredentialsConfiguration, CredentialsWithDefault, configspec
from dlt.common.configuration.specs.exceptions import InvalidBoto3Session
from dlt import version
Expand All @@ -19,13 +19,16 @@ class AwsCredentialsWithoutDefaults(CredentialsConfiguration):

def to_s3fs_credentials(self) -> Dict[str, Optional[str]]:
"""Dict of keyword arguments that can be passed to s3fs"""
return dict(
credentials: DictStrAny = dict(
key=self.aws_access_key_id,
secret=self.aws_secret_access_key,
token=self.aws_session_token,
profile=self.profile_name,
endpoint_url=self.endpoint_url,
)
if self.region_name:
credentials["client_kwargs"] = {"region_name": self.region_name}
return credentials

def to_native_representation(self) -> Dict[str, Optional[str]]:
"""Return a dict that can be passed as kwargs to boto3 session"""
Expand Down
18 changes: 16 additions & 2 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import gzip
from functools import reduce
from typing import List, IO, Any, Optional, Type, TypeVar, Generic

from dlt.common.utils import uniq_id
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
self._current_columns: TTableSchemaColumns = None
self._file_name: str = None
self._buffered_items: List[TDataItem] = []
self._buffered_items_count: int = 0
self._writer: TWriter = None
self._file: IO[Any] = None
self._closed = False
Expand All @@ -79,10 +81,20 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non
if isinstance(item, List):
# items coming in single list will be written together, not matter how many are there
self._buffered_items.extend(item)
# update row count, if item supports "num_rows" it will be used to count items
if len(item) > 0 and hasattr(item[0], "num_rows"):
self._buffered_items_count += sum(tbl.num_rows for tbl in item)
else:
self._buffered_items_count += len(item)
else:
self._buffered_items.append(item)
# update row count, if item supports "num_rows" it will be used to count items
if hasattr(item, "num_rows"):
self._buffered_items_count += item.num_rows
else:
self._buffered_items_count += 1
# flush if max buffer exceeded
if len(self._buffered_items) >= self.buffer_max_items:
if self._buffered_items_count >= self.buffer_max_items:
self._flush_items()
# rotate the file if max_bytes exceeded
if self._file:
Expand Down Expand Up @@ -118,7 +130,7 @@ def _rotate_file(self) -> None:
self._file_name = self.file_name_template % uniq_id(5) + "." + self._file_format_spec.file_extension

def _flush_items(self, allow_empty_file: bool = False) -> None:
if len(self._buffered_items) > 0 or allow_empty_file:
if self._buffered_items_count > 0 or allow_empty_file:
# we only open a writer when there are any items in the buffer and first flush is requested
if not self._writer:
# create new writer and write header
Expand All @@ -131,7 +143,9 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
# write buffer
if self._buffered_items:
self._writer.write_data(self._buffered_items)
# reset buffer and counter
self._buffered_items.clear()
self._buffered_items_count = 0

def _flush_and_close_file(self) -> None:
# if any buffered items exist, flush them
Expand Down
32 changes: 32 additions & 0 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,38 @@ def escape_mssql_literal(v: Any) -> Any:
return str(v)


# TODO review escape logic -- this should resolve SQL injection / assertion errors
def escape_synapse_literal(v: Any) -> Any:
if isinstance(v, str):
v = v.replace("'", "''") # escape single quotes
# Further sanitizations, if needed, can be added here
elif isinstance(v, (datetime, date, time)):
return v.isoformat()
elif isinstance(v, (list, dict)):
return _escape_extended(json.dumps(v), escape_dict=SYNAPSE_ESCAPE_DICT, escape_re=SYNAPSE_SQL_ESCAPE_RE)
elif isinstance(v, bytes):
base_64_string = base64.b64encode(v).decode('ascii')
return f"""CAST('' AS XML).value('xs:base64Binary("{base_64_string}")', 'VARBINARY(MAX)')"""
elif isinstance(v, bool):
return str(int(v))
return str(v)



# TODO add references
SYNAPSE_ESCAPE_DICT = {
"'": "''",
'\n': "' + CHAR(10) + N'",
'\r': "' + CHAR(13) + N'",
'\t': "' + CHAR(9) + N'",
}

SYNAPSE_SQL_ESCAPE_RE = _make_sql_escape_re(SYNAPSE_ESCAPE_DICT)

def escape_synapse_identifier(v: str) -> str:
return '"' + v.replace('"', '""') + '"'


def escape_redshift_identifier(v: str) -> str:
return '"' + v.replace('"', '""').replace("\\", "\\\\") + '"'

Expand Down
2 changes: 2 additions & 0 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ def write_data(self, rows: Sequence[Any]) -> None:
self.writer.write_batch(row)
else:
raise ValueError(f"Unsupported type {type(row)}")
# count rows that got written
self.items_count += row.num_rows

@classmethod
def data_format(cls) -> TFileFormatSpec:
Expand Down
12 changes: 12 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,18 @@ def update_normalizers(self) -> None:
normalizers["json"] = normalizers["json"] or self._normalizers_config["json"]
self._configure_normalizers(normalizers)

def add_type_detection(self, detection: TTypeDetections) -> None:
"""Add type auto detection to the schema."""
if detection not in self.settings["detections"]:
self.settings["detections"].append(detection)
self._compile_settings()

def remove_type_detection(self, detection: TTypeDetections) -> None:
"""Adds type auto detection to the schema."""
if detection in self.settings["detections"]:
self.settings["detections"].remove(detection)
self._compile_settings()

def _infer_column(self, k: str, v: Any, data_type: TDataType = None, is_variant: bool = False) -> TColumnSchema:
column_schema = TColumnSchema(
name=k,
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,4 +698,4 @@ def standard_hints() -> Dict[TColumnHint, List[TSimpleRegex]]:


def standard_type_detections() -> List[TTypeDetections]:
return ["timestamp", "iso_timestamp"]
return ["iso_timestamp"]
2 changes: 1 addition & 1 deletion dlt/destinations/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
for search_prefix in truncate_prefixes:
if item.startswith(search_prefix):
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
logger.info(f"DEL {item}")
# logger.info(f"DEL {item}")
# print(f"DEL {item}")
self.fs_client.rm(item)
except FileNotFoundError:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/insert_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
# this is using sql_client internally and will raise a right exception
if file_path.endswith("insert_values"):
job = InsertValuesLoadJob(table["name"], file_path, self.sql_client)

return job

# # TODO: implement indexes and primary keys for postgres
Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/synapse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# loader account setup

1. Create new database `CREATE DATABASE dlt_data`
2. Create new user, set password `CREATE USER loader WITH PASSWORD 'loader';`
3. Set as database owner (we could set lower permission) `ALTER DATABASE dlt_data OWNER TO loader`
52 changes: 52 additions & 0 deletions dlt/destinations/synapse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Type

from dlt.common.schema.schema import Schema
from dlt.common.configuration import with_config, known_sections
from dlt.common.configuration.accessors import config
from dlt.common.data_writers.escape import escape_postgres_identifier, escape_synapse_literal, escape_synapse_identifier, escape_postgres_identifier, escape_mssql_literal
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.wei import EVM_DECIMAL_PRECISION

from dlt.destinations.synapse.configuration import SynapseClientConfiguration


@with_config(spec=SynapseClientConfiguration, sections=(known_sections.DESTINATION, "synapse",))
def _configure(config: SynapseClientConfiguration = config.value) -> SynapseClientConfiguration:
return config


def capabilities() -> DestinationCapabilitiesContext:
# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-service-capacity-limits
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "insert_values"
caps.supported_loader_file_formats = ["insert_values"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet"]
caps.escape_identifier = escape_synapse_identifier
caps.escape_literal = escape_synapse_literal
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0)
# https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN
caps.max_identifier_length = 128
caps.max_column_identifier_length = 128
caps.max_query_length = 4 * 1024 * 64 * 1024
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 2 ** 30 - 1
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False
caps.max_rows_per_insert = 1000

return caps


def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase:
# import client when creating instance so capabilities and config specs can be accessed without dependencies installed
from dlt.destinations.synapse.synapse import SynapseClient

return SynapseClient(schema, _configure(initial_config)) # type: ignore[arg-type]


def spec() -> Type[DestinationClientConfiguration]:
return SynapseClientConfiguration
89 changes: 89 additions & 0 deletions dlt/destinations/synapse/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Final, ClassVar, Any, List, Optional
from sqlalchemy.engine import URL

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.utils import digest128
from dlt.common.typing import TSecretValue
from dlt.common.exceptions import SystemConfigurationException

from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration


@configspec
class SynapseCredentials(ConnectionStringCredentials):
drivername: Final[str] = "sqlserver" # type: ignore
user: Final[str] = "dltadmin"
password: TSecretValue
host: str
port: int = 1433
connect_timeout: int = 15
odbc_driver: str = "ODBC Driver 17 for SQL Server"

__config_gen_annotations__: ClassVar[List[str]] = ["port", "connect_timeout"]

def parse_native_representation(self, native_value: Any) -> None:
# TODO: Support ODBC connection string or sqlalchemy URL
super().parse_native_representation(native_value)
self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout))
if not self.is_partial():
self.resolve()

def on_resolved(self) -> None:
self.database = self.database.lower()

def to_url(self) -> URL:
url = super().to_url()
url.update_query_pairs([("connect_timeout", str(self.connect_timeout))])
return url

def on_partial(self) -> None:
self.odbc_driver = self._get_odbc_driver()
if not self.is_partial():
self.resolve()

def _get_odbc_driver(self) -> str:
if self.odbc_driver:
return self.odbc_driver
# Pick a default driver if available
supported_drivers = ['ODBC Driver 18 for SQL Server', 'ODBC Driver 17 for SQL Server']
import pyodbc
available_drivers = pyodbc.drivers()
for driver in supported_drivers:
if driver in available_drivers:
return driver
docs_url = "https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server?view=sql-server-ver16"
raise SystemConfigurationException(
f"No supported ODBC driver found for MS SQL Server. "
f"See {docs_url} for information on how to install the '{supported_drivers[0]}' on your platform."
)

def to_odbc_dsn(self) -> str:
params = {
"DRIVER": self.odbc_driver,
"SERVER": self.host,
"PORT": self.port,
"DATABASE": self.database,
"UID": self.username,
"PWD": self.password,
"LongAsMax": "yes",
"MARS_Connection": "yes"
}
if self.query:
params.update(self.query)
return ";".join([f"{k}={v}" for k, v in params.items()])



@configspec
class SynapseClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_name: Final[str] = "synapse" # type: ignore
credentials: SynapseCredentials

create_indexes: bool = False

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
Loading