Skip to content

Commit

Permalink
update dependencies and fix bug on has_dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
oonyoontong committed Jun 3, 2024
1 parent bad7645 commit 955eabc
Show file tree
Hide file tree
Showing 3 changed files with 404 additions and 185 deletions.
45 changes: 26 additions & 19 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager, suppress
from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence, List, Union, Dict

import pendulum
from databricks import sql as databricks_lib
from databricks.sql.client import (
Connection as DatabricksSqlConnection,
Expand Down Expand Up @@ -65,6 +66,18 @@ def rollback_transaction(self) -> None:
def native_connection(self) -> "DatabricksSqlConnection":
return self._conn

def has_dataset(self) -> bool:
query = """
SELECT CATALOG_NAME
FROM INFORMATION_SCHEMA.SCHEMATA
WHERE """
db_params = self.fully_qualified_dataset_name(escape=False).split(".", 2)
if len(db_params) == 2:
query += " catalog_name = %s AND "
query += "schema_name = %s"
rows = self.execute_sql(query, *db_params)
return len(rows) > 0

def drop_dataset(self) -> None:
self.execute_sql("DROP SCHEMA IF EXISTS %s CASCADE;" % self.fully_qualified_dataset_name())

Expand All @@ -89,27 +102,21 @@ def execute_sql(
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
curr: DBApiCursor = None
# TODO: databricks connector 3.0.0 will use :named paramstyle only
# if args:
# keys = [f"arg{i}" for i in range(len(args))]
# # Replace position arguments (%s) with named arguments (:arg0, :arg1, ...)
# # query = query % tuple(f":{key}" for key in keys)
# db_args = {}
# for key, db_arg in zip(keys, args):
# # Databricks connector doesn't accept pendulum objects
# if isinstance(db_arg, pendulum.DateTime):
# db_arg = to_py_datetime(db_arg)
# elif isinstance(db_arg, pendulum.Date):
# db_arg = to_py_date(db_arg)
# db_args[key] = db_arg
# else:
# db_args = None
db_args: Optional[Union[Dict[str, Any], Sequence[Any]]]
if kwargs:
db_args = kwargs
elif args:
db_args = args
if args:
keys = [f"arg{i}" for i in range(len(args))]
# Replace position arguments (%s) with named arguments (:arg0, :arg1, ...)
query = query % tuple(f":{key}" for key in keys)
db_args = {}
for key, db_arg in zip(keys, args):
# Databricks connector doesn't accept pendulum objects
if isinstance(db_arg, pendulum.DateTime):
db_arg = to_py_datetime(db_arg)
elif isinstance(db_arg, pendulum.Date):
db_arg = to_py_date(db_arg)
db_args[key] = db_arg
else:
db_args = None
db_args: Optional[Union[Dict[str, Any], Sequence[Any]]]
with self._conn.cursor() as curr:
curr.execute(query, db_args)
yield DBApiCursorImpl(curr) # type: ignore[abstract]
Expand Down
Loading

0 comments on commit 955eabc

Please sign in to comment.