Skip to content

Commit

Permalink
Fix databricks pandas error (#1443)
Browse files Browse the repository at this point in the history
* update dependencies for databricks/dbt

* use kwargs if args not defined, fix typing

* Revert to use inline params to keep support for 13.x cluster

* Typing fix

* adds dbt support for mssql

* converts dbt deps from extra to group, allows databricks client >2.9.3

* fixes dict to env util

* limits dbt version to <1.8 in destination tests

* skips chess dbt package for mssql

---------

Co-authored-by: Oon Tong Tan <[email protected]>
Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
3 people authored Jun 11, 2024
1 parent a9021fe commit d4340d8
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 252 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --all-extras --with airflow,providers,pipeline,sentry-sdk
run: poetry install --all-extras --with airflow,providers,pipeline,sentry-sdk,dbt

- name: Run make lint
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:

- name: Install dependencies
# install dlt with postgres support
run: poetry install --no-interaction -E postgres -E dbt --with sentry-sdk
run: poetry install --no-interaction -E postgres --with sentry-sdk,dbt

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ has-poetry:
poetry --version

dev: has-poetry
poetry install --all-extras --with airflow --with docs --with providers --with pipeline --with sentry-sdk
poetry install --all-extras --with airflow,docs,providers,pipeline,sentry-sdk,dbt

lint:
./tools/check-package.sh
Expand Down
12 changes: 11 additions & 1 deletion dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ def add_config_to_env(config: BaseConfiguration, sections: Tuple[str, ...] = ())


def add_config_dict_to_env(
dict_: Mapping[str, Any], sections: Tuple[str, ...] = (), overwrite_keys: bool = False
dict_: Mapping[str, Any],
sections: Tuple[str, ...] = (),
overwrite_keys: bool = False,
destructure_dicts: bool = True,
) -> None:
"""Writes values in dict_ back into environment using the naming convention of EnvironProvider. Applies `sections` if specified. Does not overwrite existing keys by default"""
for k, v in dict_.items():
Expand All @@ -193,5 +196,12 @@ def add_config_dict_to_env(
if env_key not in os.environ or overwrite_keys:
if v is None:
os.environ.pop(env_key, None)
elif isinstance(v, dict) and destructure_dicts:
add_config_dict_to_env(
v,
sections + (k,),
overwrite_keys=overwrite_keys,
destructure_dicts=destructure_dicts,
)
else:
os.environ[env_key] = serialize_value(v)
2 changes: 1 addition & 1 deletion dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def __init__(self, schema: Schema, config: DatabricksClientConfiguration) -> Non
sql_client = DatabricksSqlClient(config.normalize_dataset_name(schema), config.credentials)
super().__init__(schema, config, sql_client)
self.config: DatabricksClientConfiguration = config
self.sql_client: DatabricksSqlClient = sql_client
self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment]
self.type_mapper = DatabricksTypeMapper(self.capabilities)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
Expand Down
26 changes: 13 additions & 13 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager, suppress
from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence, List, Union, Dict


from databricks import sql as databricks_lib
from databricks.sql.client import (
Connection as DatabricksSqlConnection,
Expand Down Expand Up @@ -37,7 +38,9 @@ def __init__(self, dataset_name: str, credentials: DatabricksCredentials) -> Non

def open_connection(self) -> DatabricksSqlConnection:
conn_params = self.credentials.to_connector_params()
self._conn = databricks_lib.connect(**conn_params, schema=self.dataset_name)
self._conn = databricks_lib.connect(
**conn_params, schema=self.dataset_name, use_inline_params="silent"
)
return self._conn

@raise_open_connection_error
Expand Down Expand Up @@ -87,12 +90,14 @@ def execute_sql(
@contextmanager
@raise_database_error
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
curr: DBApiCursor = None
# TODO: databricks connector 3.0.0 will use :named paramstyle only
curr: DBApiCursor
# TODO: Inline param support will be dropped in future databricks driver, switch to :named paramstyle
# This will drop support for cluster runtime v13.x
# db_args: Optional[Dict[str, Any]]
# if args:
# keys = [f"arg{i}" for i in range(len(args))]
# # Replace position arguments (%s) with named arguments (:arg0, :arg1, ...)
# # query = query % tuple(f":{key}" for key in keys)
# query = query % tuple(f":{key}" for key in keys)
# db_args = {}
# for key, db_arg in zip(keys, args):
# # Databricks connector doesn't accept pendulum objects
Expand All @@ -102,15 +107,10 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
# db_arg = to_py_date(db_arg)
# db_args[key] = db_arg
# else:
# db_args = None
db_args: Optional[Union[Dict[str, Any], Sequence[Any]]]
if kwargs:
db_args = kwargs
elif args:
db_args = args
else:
db_args = None
with self._conn.cursor() as curr:
# db_args = kwargs or None

db_args = args or kwargs or None
with self._conn.cursor() as curr: # type: ignore[assignment]
curr.execute(query, db_args)
yield DBApiCursorImpl(curr) # type: ignore[abstract]

Expand Down
1 change: 0 additions & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ def decorator(
SPEC, resolvable_fields = spec_from_signature(
f, inspect.signature(f), include_defaults=standalone
)
print(SPEC, resolvable_fields, standalone)
if is_inner_resource and not standalone:
if len(resolvable_fields) > 0:
# prevent required arguments to inner functions that are not standalone
Expand Down
19 changes: 18 additions & 1 deletion dlt/helpers/dbt/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ athena:
work_group: "{{ env_var('DLT__ATHENA_WORK_GROUP', '') }}"


mssql:
target: analytics
outputs:
analytics:
type: sqlserver
driver: "{{ env_var('DLT__CREDENTIALS__DRIVER') }}"
server: "{{ env_var('DLT__CREDENTIALS__HOST') }}"
port: "{{ env_var('DLT__CREDENTIALS__PORT') | as_number }}"
database: "{{ env_var('DLT__CREDENTIALS__DATABASE') }}"
schema: "{{ var('destination_dataset_name', var('source_dataset_name')) }}"
user: "{{ env_var('DLT__CREDENTIALS__USERNAME') }}"
password: "{{ env_var('DLT__CREDENTIALS__PASSWORD') }}"
login_timeout: "{{ env_var('DLT__CREDENTIALS__CONNECT_TIMEOUT', '0') | as_number }}"
encrypt: "{{ (env_var('DLT__CREDENTIALS__QUERY__ENCRYPT', 'No') == 'yes') | as_bool }}"
trust_cert: "{{ (env_var('DLT__CREDENTIALS__QUERY__TRUSTSERVERCERTIFICATE', 'yes') == 'yes') | as_bool }}"


# commented out because dbt for Synapse isn't currently properly supported.
# Leave config here for potential future use.
# synapse:
Expand All @@ -157,7 +174,7 @@ athena:
# database: "{{ env_var('DLT__CREDENTIALS__DATABASE') }}"
# schema: "{{ var('destination_dataset_name', var('source_dataset_name')) }}"
# user: "{{ env_var('DLT__CREDENTIALS__USERNAME') }}"
# password: "{{ env_var('DLT__CREDENTIALS__PASSWORD') }}"
# password: "{{ env_var('DLT__CREDENTIALS__PASSWORD') }}"


databricks:
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/chess/dbt_transform/models/load_ids.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
{% else %}
-- take only loads with status = 0 and no other records
SELECT load_id, schema_name, schema_version_hash FROM {{ source('dlt', '_dlt_loads') }}
GROUP BY 1, 2, 3
GROUP BY load_id, schema_name, schema_version_hash
-- note that it is a hack - we make sure no other statuses exist
HAVING SUM(status) = 0
{% endif %}
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/destinations/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ destination.mssql.credentials="mssql://loader:<password>@loader.database.windows
```

### dbt support
No dbt support yet.
This destination [integrates with dbt](../transformations/dbt/dbt.md) via [dbt-snowflake](https://github.com/dbt-msft/dbt-sqlserver).

<!--@@@DLT_TUBA mssql-->

Loading

0 comments on commit d4340d8

Please sign in to comment.