Skip to content

Commit

Permalink
Merge branch 'devel' into gdrive_new_source
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Feb 7, 2024
2 parents 950cb06 + 5b240cd commit 1ac6153
Show file tree
Hide file tree
Showing 70 changed files with 11,535 additions and 8,787 deletions.
88 changes: 88 additions & 0 deletions .github/workflows/test_destination_databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@

name: test databricks

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR

ACTIVE_DESTINATIONS: "[\"databricks\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: Tests Databricks loader
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
# os: ["ubuntu-latest", "macos-latest", "windows-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E databricks -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
matrix_job_required_check:
name: Databricks loader tests
needs: run_loader
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
4 changes: 2 additions & 2 deletions .github/workflows/test_destination_mssql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ jobs:
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
Expand Down
90 changes: 90 additions & 0 deletions .github/workflows/test_destination_synapse.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
name: test synapse

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://cf6086f7d263462088b9fb9f9947caee@o4505514867163136.ingest.sentry.io/4505516212682752
RUNTIME__LOG_LEVEL: ERROR

ACTIVE_DESTINATIONS: "[\"synapse\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: Tests Synapse loader
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

- name: Install ODBC driver for SQL Server
run: |
sudo ACCEPT_EULA=Y apt-get install --yes msodbcsql18
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E synapse -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
matrix_job_required_check:
name: Synapse loader tests
needs: run_loader
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
24 changes: 24 additions & 0 deletions dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ def to_native_representation(self) -> Dict[str, Optional[str]]:
"""Return a dict that can be passed as kwargs to boto3 session"""
return dict(self)

def to_session_credentials(self) -> Dict[str, str]:
return dict(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
)


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
Expand All @@ -47,6 +54,23 @@ def on_partial(self) -> None:
if self._from_session(session) and not self.is_partial():
self.resolve()

def to_session_credentials(self) -> Dict[str, str]:
"""Return configured or new aws session token"""
if self.aws_session_token and self.aws_access_key_id and self.aws_secret_access_key:
return dict(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
)
sess = self._to_botocore_session()
client = sess.create_client("sts")
token = client.get_session_token()
return dict(
aws_access_key_id=token["Credentials"]["AccessKeyId"],
aws_secret_access_key=token["Credentials"]["SecretAccessKey"],
aws_session_token=token["Credentials"]["SessionToken"],
)

def _to_botocore_session(self) -> Any:
try:
import botocore.session
Expand Down
31 changes: 29 additions & 2 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ def escape_mssql_literal(v: Any) -> Any:
json.dumps(v), prefix="N'", escape_dict=MS_SQL_ESCAPE_DICT, escape_re=MS_SQL_ESCAPE_RE
)
if isinstance(v, bytes):
base_64_string = base64.b64encode(v).decode("ascii")
return f"""CAST('' AS XML).value('xs:base64Binary("{base_64_string}")', 'VARBINARY(MAX)')"""
from dlt.destinations.impl.mssql.mssql import VARBINARY_MAX_N

if len(v) <= VARBINARY_MAX_N:
n = str(len(v))
else:
n = "MAX"
return f"CONVERT(VARBINARY({n}), '{v.hex()}', 2)"

if isinstance(v, bool):
return str(int(v))
if v is None:
Expand All @@ -124,3 +130,24 @@ def escape_snowflake_identifier(v: str) -> str:
# Snowcase uppercase all identifiers unless quoted. Match this here so queries on information schema work without issue
# See also https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers
return escape_postgres_identifier(v.upper())


escape_databricks_identifier = escape_bigquery_identifier


DATABRICKS_ESCAPE_DICT = {"'": "\\'", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}


def escape_databricks_literal(v: Any) -> Any:
if isinstance(v, str):
return _escape_extended(v, prefix="'", escape_dict=DATABRICKS_ESCAPE_DICT)
if isinstance(v, (datetime, date, time)):
return f"'{v.isoformat()}'"
if isinstance(v, (list, dict)):
return _escape_extended(json.dumps(v), prefix="'", escape_dict=DATABRICKS_ESCAPE_DICT)
if isinstance(v, bytes):
return f"X'{v.hex()}'"
if v is None:
return "NULL"

return str(v)
24 changes: 17 additions & 7 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,29 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
# do not write INSERT INTO command, this must be added together with table name by the loader
self._f.write("INSERT INTO {}(")
self._f.write(",".join(map(self._caps.escape_identifier, headers)))
self._f.write(")\nVALUES\n")
if self._caps.insert_values_writer_type == "default":
self._f.write(")\nVALUES\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write(")\n")

def write_data(self, rows: Sequence[Any]) -> None:
super().write_data(rows)

def write_row(row: StrAny) -> None:
def write_row(row: StrAny, last_row: bool = False) -> None:
output = ["NULL"] * len(self._headers_lookup)
for n, v in row.items():
output[self._headers_lookup[n]] = self._caps.escape_literal(v)
self._f.write("(")
self._f.write(",".join(output))
self._f.write(")")
if self._caps.insert_values_writer_type == "default":
self._f.write("(")
self._f.write(",".join(output))
self._f.write(")")
if not last_row:
self._f.write(",\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write("SELECT ")
self._f.write(",".join(output))
if not last_row:
self._f.write("\nUNION ALL\n")

# if next chunk add separator
if self._chunks_written > 0:
Expand All @@ -195,10 +206,9 @@ def write_row(row: StrAny) -> None:
# write rows
for row in rows[:-1]:
write_row(row)
self._f.write(",\n")

# write last row without separator so we can write footer eventually
write_row(rows[-1])
write_row(rows[-1], last_row=True)
self._chunks_written += 1

def write_footer(self) -> None:
Expand Down
5 changes: 5 additions & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
schema_supports_numeric_precision: bool = True
timestamp_precision: int = 6
max_rows_per_insert: Optional[int] = None
insert_values_writer_type: str = "default"
supports_multiple_statements: bool = True
supports_clone_table: bool = False
"""Destination supports CREATE TABLE ... CLONE ... statements"""

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False
Expand All @@ -77,4 +81,5 @@ def generic_capabilities(
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = True
caps.supports_transactions = True
caps.supports_multiple_statements = True
return caps
8 changes: 8 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,20 @@ def data_tables(self, include_incomplete: bool = False) -> List[TTableSchema]:
)
]

def data_table_names(self) -> List[str]:
"""Returns list of table table names. Excludes dlt table names."""
return [t["name"] for t in self.data_tables()]

def dlt_tables(self) -> List[TTableSchema]:
"""Gets dlt tables"""
return [
t for t in self._schema_tables.values() if t["name"].startswith(self._dlt_tables_prefix)
]

def dlt_table_names(self) -> List[str]:
"""Returns list of dlt table names."""
return [t["name"] for t in self.dlt_tables()]

def get_preferred_type(self, col_name: str) -> Optional[TDataType]:
return next((m[1] for m in self._compiled_preferred_types if m[0].search(col_name)), None)

Expand Down
Loading

0 comments on commit 1ac6153

Please sign in to comment.