Skip to content

Commit

Permalink
cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurprevot committed Oct 13, 2024
1 parent 73972f1 commit 6ebcdbd
Showing 1 changed file with 0 additions and 22 deletions.
22 changes: 0 additions & 22 deletions yaetos/etl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,7 @@ def load_input(self, input_name):
path = self.jargs.inputs[input_name]['path']
path = path.replace('s3://', 's3a://') if 'dev_local' in self.jargs.mode.split(',') else path
logger.info("Input '{}' to be loaded from files '{}'.".format(input_name, path))

# # Get base_path. TODO: remove section (and all name_base_in_param and name_base_out_param) now that it is done with replace_placeholders
# if self.jargs.merged_args.get('name_base_in_param'):
# base_path = self.jargs.merged_args[self.jargs.merged_args.get('name_base_in_param')]
# path = path.replace('{' + self.jargs.merged_args.get('name_base_in_param') + '}', '{{base_path}}')
# else:
# base_path = self.jargs.merged_args['base_path']
base_path = self.jargs.merged_args['base_path']

path = Path_Handler(path, base_path, self.jargs.merged_args.get('root_path')).expand_latest()
self.jargs.inputs[input_name]['path_expanded'] = path

Expand Down Expand Up @@ -500,14 +492,7 @@ def expand_params(jargs, **kwargs):

def expand_input_path(self, path, **kwargs):
# Function call isolated to be overridable.
# # Get base_path. TODO: centralize
# if self.jargs.merged_args.get('name_base_in_param'):
# base_path = self.jargs.merged_args[self.jargs.merged_args.get('name_base_in_param')]
# path = path.replace('{' + self.jargs.merged_args.get('name_base_in_param') + '}', '{{base_path}}')
# else:
# base_path = self.jargs.merged_args['base_path']
base_path = self.jargs.merged_args['base_path']

return Path_Handler(path, base_path, self.jargs.merged_args.get('root_path')).expand_latest()

def expand_output_path(self, path, now_dt, **kwargs):
Expand Down Expand Up @@ -1014,13 +999,6 @@ def update_args(self, args, loaded_inputs):
if args.get('output'):
args['output']['type'] = args.pop('output.type', None) or args['output'].get('type', 'none')
if args.get('spark_app_args'): # hack to have scala sample job working. TODO: remove hardcoded case when made more generic

# Get base_path. TODO: remove section (and all name_base_in_param and name_base_out_param) now that it is done with replace_placeholders
# if args.get('name_base_in_param'): # TODO: check if requires name_base_in_param or name_base_out_param
# base_path = args[args.get('name_base_in_param')]
# args['spark_app_args'] = args['spark_app_args'].replace('{' + self.jargs.merged_args.get('name_base_in_param') + '}', '{{base_path}}')
# else:
# base_path = args.get('base_path')
base_path = args.get('base_path')
args['spark_app_args'] = Path_Handler(args['spark_app_args'], base_path, args.get('root_path')).path # TODO: remove root_path since it is now done with replace_placeholders

Expand Down

0 comments on commit 6ebcdbd

Please sign in to comment.