diff --git a/yaetos/etl_utils.py b/yaetos/etl_utils.py index e4e76992..e4897cec 100644 --- a/yaetos/etl_utils.py +++ b/yaetos/etl_utils.py @@ -1109,13 +1109,16 @@ def __init__(self, launch_jargs, app_name): df = self.create_connections_jobs(launch_jargs.storage, launch_jargs.merged_args) logger.debug('Flow app_name : {}, connection_table: {}'.format(app_name, df)) graph = self.create_global_graph(df) # top to bottom - tree = self.create_local_tree(graph, nx.DiGraph(), app_name) # bottom to top - self.leafs = self.get_leafs(tree, leafs=[]) # bottom to top + if graph.has_node(app_name): + tree = self.create_local_tree(graph, nx.DiGraph(), app_name) # bottom to top + self.leafs = self.get_leafs(tree, leafs=[]) # bottom to top + launch_jargs.cmd_args.pop('job_name', None) # removing since it should be pulled from yml and not be overriden by cmd_args. + launch_jargs.job_args.pop('job_name', None) # same + else: + self.leafs = [app_name] logger.info('Sequence of jobs to be run: {}'.format(self.leafs)) logger.info('-' * 80) logger.info('-') - launch_jargs.cmd_args.pop('job_name', None) # removing since it should be pulled from yml and not be overriden by cmd_args. - launch_jargs.job_args.pop('job_name', None) # same self.launch_jargs = launch_jargs def run_pipeline(self, sc, sc_sql):