Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Databricks destination #892

Merged
merged 55 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
13e6a0e
Add escape_databricks_identifier function to
phillem15 Nov 9, 2023
6c3e8af
Add Databricks destination client and
phillem15 Nov 9, 2023
e941f4f
Refactor Databricks SQL client to use new API
phillem15 Nov 10, 2023
6bc1e4f
Refactor Databricks SQL client code
phillem15 Nov 10, 2023
8cd57b4
Refactor DatabricksCredentials configuration class
phillem15 Nov 10, 2023
d3dd5f5
Implement commit_transaction method in
phillem15 Nov 12, 2023
a2a53fc
Fix DatabricksCredentials host and http_path
phillem15 Nov 12, 2023
b630d31
Refactor DatabricksTypeMapper and
phillem15 Nov 12, 2023
c0ce477
Add support for secret string values in Databricks
phillem15 Nov 12, 2023
b85b930
Fix DatabricksSqlClient super call to use catalog
phillem15 Nov 12, 2023
f6aac09
Update Databricks credentials configuration
phillem15 Nov 12, 2023
f2ff181
Update Databricks destination capabilities and SQL
phillem15 Nov 13, 2023
1e9992d
Update file formats for Databricks staging
phillem15 Nov 13, 2023
60e9b8b
Refactored DatabricksSqlClient.has_dataset()
phillem15 Nov 13, 2023
f82bc53
Refactor DatabricksLoadJob constructor to improve
phillem15 Nov 13, 2023
640a04b
a few small changes
sh-rp Nov 15, 2023
0d98248
Add and comment execute fragments method
phillem15 Nov 20, 2023
69404cc
Update staging file format preference
phillem15 Nov 20, 2023
635fd7e
Refactor execute_fragments method
phillem15 Nov 20, 2023
01604bb
Fix DatabricksLoadJob constructor arguments and
phillem15 Nov 20, 2023
d9fbcda
Update Databricks destination capabilities
phillem15 Dec 14, 2023
749aa11
Update Databricks destination code
phillem15 Dec 14, 2023
e0fdf3f
Fix SQL execution in SqlLoadJob
phillem15 Jan 6, 2024
8955cd9
Add SqlMergeJob to DatabricksLoadJob
phillem15 Jan 6, 2024
38357a8
Move databricks to new destination layout
steinitzu Jan 9, 2024
058489c
Databricks dependency
steinitzu Jan 9, 2024
ed659f8
Add databricks destination_type
steinitzu Jan 9, 2024
36d1718
Testing databricks with s3 staging
steinitzu Jan 10, 2024
ff7d7c0
Type mapping fixes for databricks
steinitzu Jan 10, 2024
7dbd51e
Fix some databricks bugs
steinitzu Jan 11, 2024
ac916c8
Databricks parquet only
steinitzu Jan 12, 2024
efd965f
Init databricks ci
steinitzu Jan 15, 2024
f843731
Lint, cleanup
steinitzu Jan 15, 2024
52ef939
Support databricks insert_values
steinitzu Jan 15, 2024
4cd91a8
Databricks merge disposition support
steinitzu Jan 16, 2024
444d196
Fix string escaping
steinitzu Jan 16, 2024
ed3d58b
Remove keep-staged-files option
steinitzu Jan 16, 2024
1fcdb5d
Exceptions fix, binary escape
steinitzu Jan 17, 2024
e2c9aed
databricks dbt profile
steinitzu Jan 17, 2024
d6584d9
Handle databricks 2.9 paramstyle
steinitzu Jan 17, 2024
e63de19
Databricks docs
steinitzu Jan 18, 2024
dad726a
Remove debug raise
steinitzu Jan 18, 2024
3fad7e4
Fix sql load job
steinitzu Jan 19, 2024
b3cb533
Revert debug
steinitzu Jan 26, 2024
07a5eec
Typo fix
steinitzu Jan 26, 2024
a615a0a
General execute_many method in base sql client
steinitzu Jan 26, 2024
051e9d3
Databricks client cleanup
steinitzu Jan 26, 2024
64b7e2e
Implement staging clone table in base class
steinitzu Jan 26, 2024
3076700
Personal access token auth only
steinitzu Jan 30, 2024
0f32964
stage_name is not relevant
steinitzu Jan 30, 2024
5d212a9
databricks jsonl without compression
steinitzu Feb 1, 2024
1e548e0
Update jsonl in docs
steinitzu Feb 1, 2024
e8c08e2
Check and ignore empty json files in load job
steinitzu Feb 1, 2024
2d50cc7
Cleanup
steinitzu Feb 2, 2024
dc984df
Update unsupported data types
steinitzu Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dlt.common.storages.file_storage import FileStorage
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchema, TColumnType, TSchemaTables, TTableFormat
from dlt.common.schema.utils import table_schema_has_type


from dlt.destinations.insert_job_client import InsertValuesJobClient
Expand Down Expand Up @@ -104,6 +105,7 @@ def from_db_type(
class DatabricksLoadJob(LoadJob, FollowupJob):
def __init__(
self,
table: TTableSchema,
file_path: str,
table_name: str,
load_id: str,
Expand Down Expand Up @@ -181,6 +183,27 @@ def __init__(
file_path,
"Databricks loader does not support gzip compressed JSON files. Please disable compression in the data writer configuration: https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
)
if table_schema_has_type(table, "decimal"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DECIMAL type columns from json files. Switch to parquet format to load decimals.",
)
if table_schema_has_type(table, "binary"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load BINARY type columns from json files. Switch to parquet format to load byte values.",
)
if table_schema_has_type(table, "complex"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load complex columns (lists and dicts) from json files. Switch to parquet format to load complex types.",
)
if table_schema_has_type(table, "date"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DATE type columns from json files. Switch to parquet format to load dates.",
)

source_format = "JSON"
format_options_clause = "FORMAT_OPTIONS('inferTimestamp'='true')"
# Databricks fails when trying to load empty json files, so we have to check the file size
Expand Down Expand Up @@ -236,6 +259,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->

if not job:
job = DatabricksLoadJob(
table,
file_path,
table["name"],
load_id,
Expand Down
19 changes: 13 additions & 6 deletions docs/website/docs/dlt-ecosystem/destinations/databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,25 @@ For more information on staging, see the [staging support](#staging-support) sec

## Supported file formats
* [insert-values](../file-formats/insert-format.md) is used by default
* [jsonl](../file-formats/jsonl.md) supported when staging is enabled. **Note**: Currently loading compressed jsonl files is not supported. `data_writer.disable_compression` should be set to `true` in dlt config
* [jsonl](../file-formats/jsonl.md) supported when staging is enabled (see limitations below)
* [parquet](../file-formats/parquet.md) supported when staging is enabled

The `jsonl` format has some limitations when used with Databricks:

1. Compression must be disabled to load jsonl files in databricks. Set `data_writer.disable_compression` to `true` in dlt config when using this format.
2. The following data types are not supported when using `jsonl` format with `databricks`: `decimal`, `complex`, `date`, `binary`. Use `parquet` if your data contains these types.
3. `bigint` data type with precision is not supported with `jsonl` format

Comment on lines +58 to +63
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sh-rp @rudolfix
Json support is even more limited. All these came up in tests/load/pipeline/test_stage_loading.py::test_all_data_types

  • binary in base64 does not work, not sure if it works at all in some other encoding.
  • Databricks parses date strings to timestamp and fails
  • There is no general JSON or equivelent type (I think), you need to create a struct column with a defined schema. And json objects/arrays don't convert automatically to string
  • Somehow a 16 bit int in JSON is detected as LONG

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definately think we should contact databricks about this and see what they say. @rudolfix who do we know there?


## Staging support

Databricks supports both Amazon S3 and Azure Blob Storage as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there.

### Databricks and Amazon S3

Please refer to the [S3 documentation](./filesystem.md#aws-s3) to learn how to set up your bucket with the bucket_url and credentials. For s3, the dlt Databricks loader will use the AWS credentials provided for s3 to access the s3 bucket if not specified otherwise (see config options below). You can specify your s3 bucket directly in your d
Please refer to the [S3 documentation](./filesystem.md#aws-s3) for details on connecting your s3 bucket with the bucket_url and credentials.

lt configuration:

To set up Databricks with s3 as a staging destination:
Example to set up Databricks with s3 as a staging destination:

```python
import dlt
Expand All @@ -83,7 +88,9 @@ pipeline = dlt.pipeline(

### Databricks and Azure Blob Storage

Refer to the [Azure Blob Storage filesystem documentation](./filesystem.md#azure-blob-storage) for setting up your container with the bucket_url and credentials. For Azure Blob Storage, Databricks can directly load data from the storage container specified in the configuration:
Refer to the [Azure Blob Storage filesystem documentation](./filesystem.md#azure-blob-storage) for details on connecting your Azure Blob Storage container with the bucket_url and credentials.

Example to set up Databricks with Azure as a staging destination:

```python
# Create a dlt pipeline that will load
Expand Down
3 changes: 3 additions & 0 deletions tests/load/pipeline/test_stage_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ def test_all_data_types(destination_config: DestinationTestConfiguration) -> Non
):
# Redshift can't load fixed width binary columns from parquet
exclude_columns.append("col7_precision")
if destination_config.destination == "databricks" and destination_config.file_format == "jsonl":
exclude_types.extend(["decimal", "binary", "wei", "complex", "date"])
exclude_columns.append("col1_precision")

column_schemas, data_types = table_update_and_row(
exclude_types=exclude_types, exclude_columns=exclude_columns
Expand Down
Loading