diff --git a/lea/app/run.py b/lea/app/run.py index 2997287..3b39ec1 100644 --- a/lea/app/run.py +++ b/lea/app/run.py @@ -197,7 +197,9 @@ def run( whitelist = ( # If multiple select queries are provided, we want to run the union of all the views they # select. We use a set union to remove duplicates. - set.union(*(make_whitelist(query, dag) for query in select)) if select else set(dag.keys()) + set.union(*(make_whitelist(query, dag) for query in select)) + if select + else set(dag.keys()) ) console_log(f"{len(whitelist):,d} view(s) selected") diff --git a/lea/views/dag.py b/lea/views/dag.py index be3b199..74e9236 100644 --- a/lea/views/dag.py +++ b/lea/views/dag.py @@ -48,19 +48,46 @@ def _list_descendants(node): return list(_list_descendants(node)) - def _build_nested_schema(self, deps): + @property + def _nested_schema(self): + """ + + >>> import pathlib + >>> import lea + >>> from pprint import pprint + + >>> views_dir = pathlib.Path(__file__).parent.parent.parent / "examples" / "jaffle_shop" / "views" + >>> views = lea.views.load_views(views_dir, sqlglot_dialect="duckdb") + >>> views = [view for view in views if view.schema not in {"tests"}] + >>> dag = lea.views.DAGOfViews(views) + + >>> pprint(dag._nested_schema) + {'analytics': {'finance': {'kpis': {}}, 'kpis': {}}, + 'core': {'customers': {}, 'orders': {}}, + 'staging': {'customers': {}, 'orders': {}, 'payments': {}}} + + """ + + nodes = set(node for deps in self.dependencies.values() for node in deps) | set( + self.dependencies.keys() + ) + nested_schema = {} - for path in deps: + for key in nodes: current_level = nested_schema - for part in path: + for part in key: if part not in current_level: current_level[part] = {} current_level = current_level[part] return nested_schema - def _to_mermaid_subgraphs(self, out, nested_schema: dict): + def _to_mermaid_views(self): + out = io.StringIO() + out.write('%%{init: {"flowchart": {"defaultRenderer": "elk"}} }%%\n') + out.write("flowchart TB\n") + def output_subgraph(schema: str, values: dict, prefix: str = ""): out.write(f"\n{FOUR_SPACES}subgraph {schema}\n") for value in sorted(values.keys()): @@ -73,26 +100,19 @@ def output_subgraph(schema: str, values: dict, prefix: str = ""): out.write(f"{FOUR_SPACES*2}{full_path}({value})\n") out.write(f"{FOUR_SPACES}end\n\n") + # Print out the nodes, within each subgraph block + nested_schema = self._nested_schema for schema in sorted(nested_schema.keys()): values = nested_schema[schema] output_subgraph(schema, values) - def _to_mermaid_views(self): - out = io.StringIO() - out.write('%%{init: {"flowchart": {"defaultRenderer": "elk"}} }%%\n') - out.write("flowchart TB\n") - nodes = set(node for deps in self.dependencies.values() for node in deps) | set( - self.dependencies.keys() - ) - - nested_schema = self._build_nested_schema(nodes) - self._to_mermaid_subgraphs(out, nested_schema) - + # Print out the edges for dst, srcs in sorted(self.dependencies.items()): dst = ".".join(dst) for src in sorted(srcs): src = ".".join(src) out.write(f"{FOUR_SPACES}{src} --> {dst}\n") + return out.getvalue() def _to_mermaid_schemas(self):