-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend layout placeholder params for filesystem destinations #1149
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
e8b0c33
to
be088ee
Compare
destinatination_kwargs: t.Dict[str, t.Any] = {} | ||
if current_datetime: | ||
destinatination_kwargs["current_datetime"] = current_datetime | ||
|
||
if datetime_format: | ||
destinatination_kwargs["datetime_format"] = datetime_format | ||
|
||
if extra_params: | ||
destinatination_kwargs["extra_params"] = extra_params |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK factory
is tricky and its main goal is to delay instantiation, population of the configs until runtime (ie. load step) while the UX is like you can create destination at any moment.
look at base init
def __init__(self, **kwargs: Any) -> None:
# Create initial unresolved destination config
# Argument defaults are filtered out here because we only want arguments passed explicitly
# to supersede config from the environment or pipeline args
sig = inspect.signature(self.__class__.__init__)
params = sig.parameters
self.config_params = {
k: v for k, v in kwargs.items() if k not in params or v != params[k].default
}
it will store all unknown paramters in config_params
and then here
def configuration(self, initial_config: TDestinationConfig) -> TDestinationConfig:
"""Get a fully resolved destination config from the initial config"""
config = resolve_configuration(
initial_config,
sections=(known_sections.DESTINATION, self.destination_name),
# Already populated values will supersede resolved env config
explicit_value=self.config_params,
)
return config
whenever configuration is created, we use those params as explicit initials
outcome: you do not need any special code to handle that. just pass additional stuff to super() init and it will be pushed to config during resolution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
destinatination_kwargs: t.Dict[str, t.Any] = {} | ||
if current_datetime: | ||
destinatination_kwargs["current_datetime"] = current_datetime | ||
|
||
if datetime_format: | ||
destinatination_kwargs["datetime_format"] = datetime_format | ||
|
||
if extra_params: | ||
destinatination_kwargs["extra_params"] = extra_params |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK factory
is tricky and its main goal is to delay instantiation, population of the configs until runtime (ie. load step) while the UX is like you can create destination at any moment.
look at base init
def __init__(self, **kwargs: Any) -> None:
# Create initial unresolved destination config
# Argument defaults are filtered out here because we only want arguments passed explicitly
# to supersede config from the environment or pipeline args
sig = inspect.signature(self.__class__.__init__)
params = sig.parameters
self.config_params = {
k: v for k, v in kwargs.items() if k not in params or v != params[k].default
}
it will store all unknown paramters in config_params
and then here
def configuration(self, initial_config: TDestinationConfig) -> TDestinationConfig:
"""Get a fully resolved destination config from the initial config"""
config = resolve_configuration(
initial_config,
sections=(known_sections.DESTINATION, self.destination_name),
# Already populated values will supersede resolved env config
explicit_value=self.config_params,
)
return config
whenever configuration is created, we use those params as explicit initials
outcome: you do not need any special code to handle that. just pass additional stuff to super() init and it will be pushed to config during resolution
self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value] | ||
) | ||
|
||
def on_resolved(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do not need this method at all. config will be populated automatically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK you need it to validate the layout. at this moment you know all the config values. some come from providers some from factory,
so here you need to validate the layout. see if all placeholders are known. if not we need a really good explanation which are missing etc.
self.destination_file_name = make_filename(config, file_name, schema_name, load_id) | ||
|
||
# Make sure directory exists before moving file | ||
dir_path = self.make_remote_path(only_dir=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't do that. fsspec
does that in put_file
. this code is not needed. what was your reason to add it?
OFC this smells like race conditions that's why we try to "create" dirs in initialize. btw. buckets do not have dirs really
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rudolfix the strange thing was that ffspec didn't create directory that's why I added it, I will remove this and see if test pipeline will work out.
|
||
|
||
def get_table_prefix_layout( | ||
config: FilesystemDestinationClientConfiguration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hopefully this will merge into one nice path_utils or layout.py :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes layout.py
is temporary thing to keep things clean and move things to path_utils
before merging.
) -> None: | ||
self.params = params | ||
self.allowed_placeholders = allowed_placeholders.copy() | ||
self.layout_placeholders = re.findall(r"\{(.*?)\}", path_layout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's wrong with using path_utils
method?
if self.config.extra_params: | ||
for key, value in self.config.extra_params.items(): | ||
if callable(value): | ||
self._params[key] = value( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here you must catch TypeError which indicates a wrong signature and explain what the correct signature is. otherwise people will be confused
ideally you could inspect all the signatures when validating placeholders
"{table_name}/{year}/{month}/{day}/{load_id}.{file_id}.{ext}", | ||
"{table_name}/{day}/{hour}/{minute}/{load_id}.{file_id}.{ext}", | ||
"{table_name}/{timestamp}/{load_id}.{file_id}.{ext}", | ||
"{table_name}/{dow}/{load_id}.{file_id}.{ext}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add
{YY}/{MM}/{DD}/{table_name}/...
and see where it fails
super().__init__(schema, config) | ||
self.fs_client, self.fs_path = fsspec_from_config(config) | ||
self.config: FilesystemDestinationClientConfiguration = config | ||
# verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables | ||
# cannot be replaced and we cannot initialize folders consistently | ||
self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout) | ||
self.table_prefix_layout = get_table_prefix_layout(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll comment on that in next review. this is a bigger chunk. we want to support all paths in append-only and require old layout only when we replace data (we can't guarantee replacement otherwise)
Closing this in favor of #1182 |
This PR addresses #930
Requirements:
@sh-rp recommends to test with staging destination
you’ll have to test it, write a test where you run the destination settings that use staging and add a map function for the filename
TODO