Skip to content

Commit

Permalink
databricks jsonl without compression
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Feb 1, 2024
1 parent 0f32964 commit 5d212a9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 4 deletions.
4 changes: 2 additions & 2 deletions dlt/destinations/impl/databricks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "insert_values"
caps.supported_loader_file_formats = ["insert_values"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet"]
caps.preferred_staging_file_format = "jsonl"
caps.supported_staging_file_formats = ["jsonl", "parquet"]
caps.escape_identifier = escape_databricks_identifier
caps.escape_literal = escape_databricks_literal
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
Expand Down
15 changes: 14 additions & 1 deletion dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.type_mapping import TypeMapper
from dlt import config


class DatabricksTypeMapper(TypeMapper):
Expand Down Expand Up @@ -124,6 +125,7 @@ def __init__(
)
from_clause = ""
credentials_clause = ""
format_options_clause = ""

if bucket_path:
bucket_url = urlparse(bucket_path)
Expand All @@ -138,6 +140,7 @@ def __init__(
credentials_clause = f"""WITH(CREDENTIAL(
AWS_ACCESS_KEY='{s3_creds["aws_access_key_id"]}',
AWS_SECRET_KEY='{s3_creds["aws_secret_access_key"]}',
AWS_SESSION_TOKEN='{s3_creds["aws_session_token"]}'
))
"""
Expand Down Expand Up @@ -172,12 +175,22 @@ def __init__(
)

# decide on source format, stage_file_path will either be a local file or a bucket path
source_format = "PARQUET" # Only parquet is supported
if file_name.endswith(".parquet"):
source_format = "PARQUET" # Only parquet is supported
elif file_name.endswith(".jsonl"):
if not config.get("data_writer.disable_compression"):
raise LoadJobTerminalException(
file_path,
"Databricks loader does not support gzip compressed JSON files. Please disable compression in the data writer configuration: https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
)
source_format = "JSON"
format_options_clause = "FORMAT_OPTIONS('inferTimestamp'='true')"

statement = f"""COPY INTO {qualified_table_name}
{from_clause}
{credentials_clause}
FILEFORMAT = {source_format}
{format_options_clause}
"""
client.execute_sql(statement)

Expand Down
19 changes: 18 additions & 1 deletion tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class DestinationTestConfiguration:
supports_merge: bool = True # TODO: take it from client base class
force_iceberg: bool = False
supports_dbt: bool = True
disable_compression: bool = False

@property
def name(self) -> str:
Expand All @@ -121,7 +122,7 @@ def setup(self) -> None:
os.environ["DESTINATION__FORCE_ICEBERG"] = str(self.force_iceberg) or ""

"""For the filesystem destinations we disable compression to make analyzing the result easier"""
if self.destination == "filesystem":
if self.destination == "filesystem" or self.disable_compression:
os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True"

def setup_pipeline(
Expand Down Expand Up @@ -250,6 +251,22 @@ def destinations_configs(
bucket_url=AZ_BUCKET,
extra_info="az-authorization",
),
DestinationTestConfiguration(
destination="databricks",
staging="filesystem",
file_format="jsonl",
bucket_url=AWS_BUCKET,
extra_info="s3-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="databricks",
staging="filesystem",
file_format="jsonl",
bucket_url=AZ_BUCKET,
extra_info="s3-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="databricks",
staging="filesystem",
Expand Down

0 comments on commit 5d212a9

Please sign in to comment.