Skip to content

Commit

Permalink
update dependencies for databricks/dbt
Browse files Browse the repository at this point in the history
  • Loading branch information
oonyoontong committed Jun 4, 2024
1 parent bad7645 commit 39779e9
Show file tree
Hide file tree
Showing 3 changed files with 377 additions and 171 deletions.
34 changes: 14 additions & 20 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager, suppress
from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence, List, Union, Dict

import pendulum
from databricks import sql as databricks_lib
from databricks.sql.client import (
Connection as DatabricksSqlConnection,
Expand Down Expand Up @@ -88,28 +89,21 @@ def execute_sql(
@raise_database_error
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
curr: DBApiCursor = None
# TODO: databricks connector 3.0.0 will use :named paramstyle only
# if args:
# keys = [f"arg{i}" for i in range(len(args))]
# # Replace position arguments (%s) with named arguments (:arg0, :arg1, ...)
# # query = query % tuple(f":{key}" for key in keys)
# db_args = {}
# for key, db_arg in zip(keys, args):
# # Databricks connector doesn't accept pendulum objects
# if isinstance(db_arg, pendulum.DateTime):
# db_arg = to_py_datetime(db_arg)
# elif isinstance(db_arg, pendulum.Date):
# db_arg = to_py_date(db_arg)
# db_args[key] = db_arg
# else:
# db_args = None
db_args: Optional[Union[Dict[str, Any], Sequence[Any]]]
if kwargs:
db_args = kwargs
elif args:
db_args = args
if args:
keys = [f"arg{i}" for i in range(len(args))]
# Replace position arguments (%s) with named arguments (:arg0, :arg1, ...)
query = query % tuple(f":{key}" for key in keys)
db_args = {}
for key, db_arg in zip(keys, args):
# Databricks connector doesn't accept pendulum objects
if isinstance(db_arg, pendulum.DateTime):
db_arg = to_py_datetime(db_arg)
elif isinstance(db_arg, pendulum.Date):
db_arg = to_py_date(db_arg)
db_args[key] = db_arg
else:
db_args = None
db_args: Optional[Union[Dict[str, Any], Sequence[Any]]]
with self._conn.cursor() as curr:
curr.execute(query, db_args)
yield DBApiCursorImpl(curr) # type: ignore[abstract]
Expand Down
Loading

0 comments on commit 39779e9

Please sign in to comment.