Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Synapse Destination #677

Closed
wants to merge 13 commits into from
Closed

[WIP] Synapse Destination #677

wants to merge 13 commits into from

Conversation

NeilGorman104
Copy link
Contributor

Description

Adds the Synapse destination to the dlt destinations list.

This destination code is based off of the mssql destination, and makes the following changes:

  1. Updates "mssql" references to "synapse" in destination code
  2. Adds Synapse Github Testing Workflow
  3. Adds synapse_escape_literal function in escape.py (lines 91-115)
  4. Updates max_text_data_type_length to 4000 (max character length for Synapse) (init.py line 39)
  5. Changes nvarchar(max) to nvarchar(4000) and varbinary(max) to varbinary(8000) in type mapping. ("max" length not supported in Synapse) (synapse.py lines 22-30)
  6. Removes IF EXISTS statements from queries in generate_sql, drop_dataset and drop_views functions (not supported in Synapse)
  7. Adds LongAsMax to connection string to update ntext data types to nvarchar (configuration.py line 68)
  8. Adds handling to insert multiple values for several columns in separate INSERT INTO queries using UNION ALL. (Synapse does not support multi-row inserts using a single INSERT INTO statement) (Insert_job_client.py lines 62-104)

Remaining Issues

  1. Azure Blob Storage - We will need to implement method that lands data into Azure Blob Storage and utilizes COPY command to load the data into SQL pools: https://learn.microsoft.com/en-us/azure/synapse-analytics/sql/load-data-overview

  2. Resolve error in tests: String or binary data would be truncated for INSERT INTO queries that are inserting values over the max length of 4000

  3. Resolve error in tests : Invalid object name '[TABLE]' (Table _is_ generated by CREATE TABLE` query but isn't recognized)

  4. Resolve Error in test_data_writer_string_escape test: Operation cannot be performed within a transaction

  5. Resolve Error in test_execute_df test: Parse error at line: 1, column: 102: Incorrect syntax near ','

@netlify
Copy link

netlify bot commented Oct 6, 2023

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit d5a3572
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/6529a1264c857f000928a6c5

@NeilGorman104 NeilGorman104 changed the title Synapse [WIP] Synapse Destination Oct 6, 2023
@NeilGorman104 NeilGorman104 marked this pull request as draft October 6, 2023 15:29
@@ -10,6 +10,7 @@
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.job_impl import EmptyLoadJob
from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from tests.utils import ACTIVE_DESTINATIONS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't imort test code into production code...

else:
# Replace the , with ;
insert_sql.append("".join(chunk).strip()[:-1] + ";\n")
if "synapse" in ACTIVE_DESTINATIONS:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you need a different behavior here for synapse, you need to extend or replace this load job for synapse or somehow manage this with the capabilities object. this code is depended on some var that is only available in the test environment and will not work in production

if not tables:
return
# If a transaction is active, commit it
if not self._conn.autocommit:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think you need this kind of code here. In which case is a transaction avtive when you drop the dataset?

def _drop_views(self, *tables: str) -> None:
# Save the current autocommit state
# If a transaction is active, commit it
if not self._conn.autocommit:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

def native_connection(self) -> pyodbc.Connection:
return self._conn

def drop_dataset(self) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to check if each table exists? can't you just drop all tables in the dataset like it is done in the mssql client?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants