Skip to content

Commit

Permalink
enables gcs staging databricks (#1933)
Browse files Browse the repository at this point in the history
* enables gcs staging for Databricks via named credential

* bumps dlt to 1.2.0

* Update docs/website/docs/dlt-ecosystem/destinations/databricks.md

* Update docs/website/docs/dlt-ecosystem/destinations/databricks.md

* fixes gcs databricks test

---------

Co-authored-by: Anton Burnashev <[email protected]>
  • Loading branch information
rudolfix and burnash authored Oct 7, 2024
1 parent b5b3ab1 commit 2d07a43
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 9 deletions.
16 changes: 12 additions & 4 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dlt.destinations.job_impl import ReferenceFollowupJobRequest

AZURE_BLOB_STORAGE_PROTOCOLS = ["az", "abfss", "abfs"]
SUPPORTED_BLOB_STORAGE_PROTOCOLS = AZURE_BLOB_STORAGE_PROTOCOLS + ["s3", "gs", "gcs"]


class DatabricksLoadJob(RunnableLoadJob, HasFollowupJobs):
Expand Down Expand Up @@ -69,11 +70,12 @@ def run(self) -> None:
bucket_url = urlparse(bucket_path)
bucket_scheme = bucket_url.scheme

if bucket_scheme not in AZURE_BLOB_STORAGE_PROTOCOLS + ["s3"]:
if bucket_scheme not in SUPPORTED_BLOB_STORAGE_PROTOCOLS:
raise LoadJobTerminalException(
self._file_path,
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and"
" azure buckets are supported",
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3, azure"
" and gcs buckets are supported. Please note that gcs buckets are supported"
" only via named credential",
)

if self._job_client.config.is_staging_external_location:
Expand Down Expand Up @@ -106,6 +108,12 @@ def run(self) -> None:
bucket_path = self.ensure_databricks_abfss_url(
bucket_path, staging_credentials.azure_storage_account_name
)
else:
raise LoadJobTerminalException(
self._file_path,
"You need to use Databricks named credential to use google storage."
" Passing explicit Google credentials is not supported by Databricks.",
)

if bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS:
assert isinstance(
Expand All @@ -125,7 +133,7 @@ def run(self) -> None:
raise LoadJobTerminalException(
self._file_path,
"Cannot load from local file. Databricks does not support loading from local files."
" Configure staging with an s3 or azure storage bucket.",
" Configure staging with an s3, azure or google storage bucket.",
)

# decide on source format, stage_file_path will either be a local file or a bucket path
Expand Down
7 changes: 6 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ The `jsonl` format has some limitations when used with Databricks:

## Staging support

Databricks supports both Amazon S3 and Azure Blob Storage as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there.
Databricks supports both Amazon S3, Azure Blob Storage and Google Cloud Storage as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there.

### Databricks and Amazon S3

Expand Down Expand Up @@ -187,6 +187,11 @@ pipeline = dlt.pipeline(

```

### Databricks and Google Cloud Storage

In order to load from Google Cloud Storage stage you must set-up the credentials via **named credential**. See below. Databricks does not allow to pass Google Credentials
explicitly in SQL Statements.

### Use external locations and stored credentials
`dlt` forwards bucket credentials to the `COPY INTO` SQL command by default. You may prefer to use [external locations or stored credentials instead](https://docs.databricks.com/en/sql/language-manual/sql-ref-external-locations.html#external-location) that are stored on the Databricks side.

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dlt"
version = "1.1.0"
version = "1.2.0"
description = "dlt is an open-source python-first scalable data loading library that does not require any backend to run."
authors = ["dltHub Inc. <[email protected]>"]
maintainers = [ "Marcin Rudolf <[email protected]>", "Adrian Brudaru <[email protected]>", "Anton Burnashev <[email protected]>", "David Scharf <[email protected]>" ]
Expand Down
61 changes: 58 additions & 3 deletions tests/load/pipeline/test_databricks_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import os

from dlt.common.utils import uniq_id
from tests.load.utils import DestinationTestConfiguration, destinations_configs, AZ_BUCKET
from tests.load.utils import (
GCS_BUCKET,
DestinationTestConfiguration,
destinations_configs,
AZ_BUCKET,
)
from tests.pipeline.utils import assert_load_info


Expand All @@ -13,7 +18,7 @@
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True, bucket_subset=(AZ_BUCKET), subset=("databricks",)
default_sql_configs=True, bucket_subset=(AZ_BUCKET,), subset=("databricks",)
),
ids=lambda x: x.name,
)
Expand Down Expand Up @@ -62,7 +67,7 @@ def test_databricks_external_location(destination_config: DestinationTestConfigu
in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
)

# # should fail on non existing stored credentials
# should fail on non existing stored credentials
bricks = databricks(is_staging_external_location=False, staging_credentials_name="CREDENTIAL_X")
pipeline = destination_config.setup_pipeline(
"test_databricks_external_location",
Expand Down Expand Up @@ -90,3 +95,53 @@ def test_databricks_external_location(destination_config: DestinationTestConfigu
assert (
"credential_x" in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True, bucket_subset=(AZ_BUCKET,), subset=("databricks",)
),
ids=lambda x: x.name,
)
def test_databricks_gcs_external_location(destination_config: DestinationTestConfiguration) -> None:
# do not interfere with state
os.environ["RESTORE_FROM_DESTINATION"] = "False"
# let the package complete even with failed jobs
os.environ["RAISE_ON_FAILED_JOBS"] = "false"

dataset_name = "test_databricks_gcs_external_location" + uniq_id()

# swap AZ bucket for GCS_BUCKET
from dlt.destinations import databricks, filesystem

stage = filesystem(GCS_BUCKET)

# explicit cred handover should fail
bricks = databricks()
pipeline = destination_config.setup_pipeline(
"test_databricks_gcs_external_location",
dataset_name=dataset_name,
destination=bricks,
staging=stage,
)
info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs)
assert info.has_failed_jobs is True
assert (
"You need to use Databricks named credential"
in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
)

# should fail on non existing stored credentials
bricks = databricks(is_staging_external_location=False, staging_credentials_name="CREDENTIAL_X")
pipeline = destination_config.setup_pipeline(
"test_databricks_external_location",
dataset_name=dataset_name,
destination=bricks,
staging=stage,
)
info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs)
assert info.has_failed_jobs is True
assert (
"credential_x" in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
)

0 comments on commit 2d07a43

Please sign in to comment.