Skip to content

Commit

Permalink
add native implementation for databricks
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Aug 7, 2024
1 parent 3296e63 commit 6f6500f
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from contextlib import contextmanager, suppress
from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence, List, Tuple, Union, Dict
from typing import Any, AnyStr, ClassVar, Generator, Iterator, Optional, Sequence, List, Tuple, Union, Dict


from databricks import sql as databricks_lib
Expand All @@ -21,7 +21,7 @@
raise_database_error,
raise_open_connection_error,
)
from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame
from dlt.destinations.typing import ArrowTable, DBApi, DBApiCursor, DBTransaction, DataFrame
from dlt.destinations.impl.databricks.configuration import DatabricksCredentials


Expand All @@ -31,17 +31,20 @@ class DatabricksCursorImpl(DBApiCursorImpl):
native_cursor: DatabricksSqlCursor # type: ignore[assignment]
vector_size: ClassVar[int] = 2048 # vector size is 2048

def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]:
if chunk_size is None:
return self.native_cursor.fetchall_arrow().to_pandas()
else:
df = self.native_cursor.fetchmany_arrow(chunk_size).to_pandas()
if df.shape[0] == 0:
return None
else:
return df


yield self.native_cursor.fetchall_arrow()
return
while True:
table = self.native_cursor.fetchmany_arrow(chunk_size)
if table.num_rows == 0:
return
yield table

def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]:
for table in self.iter_arrow(chunk_size=chunk_size):
yield table.to_pandas()

class DatabricksSqlClient(SqlClientBase[DatabricksSqlConnection], DBTransaction):
dbapi: ClassVar[DBApi] = databricks_lib

Expand Down

0 comments on commit 6f6500f

Please sign in to comment.