-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Databricks destination #892
Conversation
✅ Deploy Preview for dlt-hub-docs ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
def capabilities() -> DestinationCapabilitiesContext: | ||
caps = DestinationCapabilitiesContext() | ||
caps.preferred_loader_file_format = "parquet" | ||
caps.supported_loader_file_formats = ["parquet"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not get jsonl
to work with COPY INTO
. Suspect it's because of gzip, i.e. either databricks doesn't support reading gzipped json or more likely requires file extension json.gz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you confirm if using a different extension works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a look at the databricks documentation, since we have jsonl and not json in our staging location, you probably have to set multilinemode to true. maybe that'll help. lmk.
https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The multiline mode applies to records that span multiple lines, jsonl works otherwise by default but only with gz compression disabled.
I have to try some script with jsonl.gz
filename, looks like a lot of work to actually change extensions in dlt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed that gzip works with jsonl.gz
extension.
Might be good to change this so dlt creates files with gz
ext, that's pretty universally supported, but would leave it for another pr.
I just have parquet enabled for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steinitzu my 2 cents:
there's really no way to set the compression explictly ie. in copy or format options?
if not then please do not disable JSONL, just detect that compression is enabled and fail the job with an exception (ie. ask to disable compression)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another ask @steinitzu could you write a ticket to add gz
extensions to zipped files in dlt load packages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steinitzu my 2 cents: there's really no way to set the compression explictly ie. in copy or format options?
Hey @rudolfix no there's no option for it that's documented.
if not then please do not disable JSONL, just detect that compression is enabled and fail the job with an exception (ie. ask to disable compression)
Hmm yeah that makes sense for now. Until we change the extensions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It fails on empty jsonl files too. Produced by this test: https://github.com/dlt-hub/dlt/blob/sthor%2Fdatabricks/tests/load/pipeline/test_replace_disposition.py#L154-L163
Is this supposed to produce a stage file? Not sure if I'm doing something wrong.
Edit: workaround https://github.com/dlt-hub/dlt/blob/sthor%2Fdatabricks/dlt/destinations/impl/databricks/databricks.py#L187-L190
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, empty jsonl files are correct. they mean that content of the table should be dropped as there are no rows in current load. we generate them ie. to delete data from child tables on replace if there were no data at all for given table. your workaround is good and will do what we expected! thanks
9b712aa
to
e45ce07
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice work, I have a few comments
"schema", | ||
] | ||
|
||
def __post_init__(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also have the on_resolved function that gets called after the configuration is resolved, maybe you can use that, or do you need this to run before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah on_resolved should work. Didn't touch these classes much from the previous PR since it seemed to work, needs some tweaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I removed a lot of this and am only supporting the "personal access token" auth.
I don't have a way to test oauth atm, will need to be done somehow so we're not prompting on every connection.
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and azure buckets are supported", | ||
) | ||
else: | ||
raise LoadJobTerminalException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
somehow i think it would be nice to be able to define in the capabilities which staging filesystem types are supported by each destination. not sure wether to add this now or later, but it would save some of this code in a few places and we could also catch config errors earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to have this in capabilities. Can do that and refactor in another PR.
|
||
return sql | ||
|
||
def _execute_schema_update_sql(self, only_tables: Iterable[str]) -> TSchemaTables: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to override this method? won't the one in the baseclass work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it does. Updated the baseclass to use execute_many
. The base sqlclient implements it as:
https://github.com/dlt-hub/dlt/blob/sthor%2Fdatabricks/dlt/destinations/sql_client.py#L122-L139 depending on capabilities
configuration and rename classes and methods
DatabricksSqlClient
DatabricksLoadJob
instead of database. Add has_dataset method to check if dataset exists.
method to use correct query based on presence of catalog in db_params.
readability and add optional parameters.
update COPY INTO statement
3f190d6
to
64b7e2e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! thanks everyone for the quality work! if @sh-rp agrees we should merge this.
super().__init__( | ||
credentials=credentials, | ||
stage_name=stage_name, | ||
keep_staged_files=keep_staged_files, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few small comments again :)
The `jsonl` format has some limitations when used with Databricks: | ||
|
||
1. Compression must be disabled to load jsonl files in databricks. Set `data_writer.disable_compression` to `true` in dlt config when using this format. | ||
2. The following data types are not supported when using `jsonl` format with `databricks`: `decimal`, `complex`, `date`, `binary`. Use `parquet` if your data contains these types. | ||
3. `bigint` data type with precision is not supported with `jsonl` format | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sh-rp @rudolfix
Json support is even more limited. All these came up in tests/load/pipeline/test_stage_loading.py::test_all_data_types
binary
in base64 does not work, not sure if it works at all in some other encoding.- Databricks parses
date
strings to timestamp and fails - There is no general
JSON
or equivelent type (I think), you need to create a struct column with a defined schema. And json objects/arrays don't convert automatically to string - Somehow a 16 bit int in JSON is detected as LONG
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definately think we should contact databricks about this and see what they say. @rudolfix who do we know there?
Description
Draft of databricks destination #762
Related Issues
Additional Context
At the moment
COPY INTO
is always used and requiress3
/azure
staging bucket