-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend custom destination #1107
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
dlt/pipeline/pipeline.py
Outdated
@@ -456,7 +456,18 @@ def normalize( | |||
return None | |||
|
|||
# make sure destination capabilities are available | |||
self._get_destination_capabilities() | |||
caps = self._get_destination_capabilities() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rudolfix i need some guidance where to inject / overwrite the max_nesting_level coming from a destination. I realize this place is very likely not the right one, but I am not sure where and how to do it. Should I get the capabilities context in the relationalnormalizer and not persist this setting to the schema at all, or what is the best way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only thing you need to do is to fix NormalizersConfiguration
def on_resolved(self) -> None:
# get naming from capabilities if not present
if self.naming is None:
if self.destination_capabilities:
self.naming = self.destination_capabilities.naming_convention
detect the type of the json normalizer and apply the settings to it like the below. you can override existing settings. I think capabilities (if not None) should have precedence over the source settings.
what happens later:
when new schema is created this setting will be used
when schema is loaded - it will not but when we call update_normalize - it will. and we do that in normalizer (in schema.clone). so it should work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is set on the nested json normalizer settings i had to change a bit more but not much, I hope it is ok to change the type from mapping to dict in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is good! see the trick to apply max nesting to schema
@@ -23,6 +23,8 @@ def destination( | |||
batch_size: int = 10, | |||
name: str = None, | |||
naming_convention: str = "direct", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good! please add this to our docs: that the default settings are such that data comes to sink without changing identifiers, un-nested and with dlt identifiers removed. and that it is good to push stuff to queues and REST APIs
@@ -27,6 +30,8 @@ | |||
TDestinationCallable, | |||
) | |||
|
|||
INTERNAL_MARKER = "_dlt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you must use schema._dlt_tables_prefix (which may be normalized) to detect dlt identifiers. you may add such method to schema (but it will be slower to call a method)
dlt/pipeline/pipeline.py
Outdated
@@ -456,7 +456,18 @@ def normalize( | |||
return None | |||
|
|||
# make sure destination capabilities are available | |||
self._get_destination_capabilities() | |||
caps = self._get_destination_capabilities() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only thing you need to do is to fix NormalizersConfiguration
def on_resolved(self) -> None:
# get naming from capabilities if not present
if self.naming is None:
if self.destination_capabilities:
self.naming = self.destination_capabilities.naming_convention
detect the type of the json normalizer and apply the settings to it like the below. you can override existing settings. I think capabilities (if not None) should have precedence over the source settings.
what happens later:
when new schema is created this setting will be used
when schema is loaded - it will not but when we call update_normalize - it will. and we do that in normalizer (in schema.clone). so it should work!
propagate the max_nesting_level the correct way from the destination caps
d3c7d89
to
040ed32
Compare
040ed32
to
f6e7e6f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few additions to docs and cross references. Let's merge this first. LGTM!
): | ||
self.json_normalizer = self.json_normalizer or {} | ||
self.json_normalizer.setdefault("config", {}) | ||
self.json_normalizer["config"][ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the best we can do now. if we have more normalizers with incompatible configs then we'll need to look for something better
* removed sink mentions, fixed code snippets * rename title * trigger tests * trigger tests 2 * revert changes * small edits
Description
This PR introduces two new settings for the custom destination decorator: