From f0103f307e9527f4bdee69a9f24aa718f4578970 Mon Sep 17 00:00:00 2001 From: Max Halford Date: Tue, 17 Oct 2023 17:35:47 +0200 Subject: [PATCH] allow selecting tables in a schema --- README.md | 40 +++++- examples/jaffle_shop/docs/README.md | 11 +- examples/jaffle_shop/docs/analytics/README.md | 20 +++ examples/jaffle_shop/views/analytics/kpis.sql | 13 ++ lea/app/run.py | 125 ++++++++++++++++++ lea/views/dag.py | 12 +- pyproject.toml | 2 +- tests/test_examples.py | 2 +- 8 files changed, 209 insertions(+), 16 deletions(-) create mode 100644 examples/jaffle_shop/docs/analytics/README.md create mode 100644 examples/jaffle_shop/views/analytics/kpis.sql diff --git a/README.md b/README.md index 3e6a98b..5fae180 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,11 @@ code_quality + + + pypi + + license @@ -149,7 +154,26 @@ lea run --only +core.users # users and everything it depends on lea run --only +core.users+ # users and all its dependencies ``` -You may of course do combinations: +You can select all views in a schema: + +```sh +lea run --only core.* +``` + +There are thus 8 possible operators: + +``` +schema.table (table by itself) +schema.table+ (table with its descendants) ++schema.table (table with its ancestors) ++schema.table+ (table with its ancestors and descendants) +schema/ (all tables in schema) +schema/+ (all tables in schema with their descendants) ++schema/ (all tables in schema with their ancestors) ++schema/+ (all tables in schema with their ancestors and descendants) +``` + +Combinations are possible: ```sh lea run --only core.users+ --only +core.orders @@ -268,13 +292,15 @@ lea is meant to be used as a CLI. But you can import it as a Python library too. **Parsing a directory of queries** ```py ->>> from lea import views +>>> import lea ->>> views = views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb') +>>> views = lea.views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb') >>> views = [v for v in views if v.schema != 'tests'] >>> for view in sorted(views, key=str): ... print(view) ... print(sorted(view.dependencies)) +analytics.kpis +[('core', 'customers'), ('core', 'orders')] core.customers [('staging', 'customers'), ('staging', 'orders'), ('staging', 'payments')] core.orders @@ -291,12 +317,11 @@ staging.payments **Organizing queries into a DAG** ```py ->>> from lea import views ->>> from lea.views import DAGOfViews +>>> import lea ->>> views = views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb') +>>> views = lea.views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb') >>> views = [v for v in views if v.schema != 'tests'] ->>> dag = DAGOfViews(views) +>>> dag = lea.views.DAGOfViews(views) >>> while dag.is_active(): ... for schema, table in sorted(dag.get_ready()): ... print(f'{schema}.{table}') @@ -306,6 +331,7 @@ staging.orders staging.payments core.customers core.orders +analytics.kpis ``` diff --git a/examples/jaffle_shop/docs/README.md b/examples/jaffle_shop/docs/README.md index e5553bf..4d2ab9a 100644 --- a/examples/jaffle_shop/docs/README.md +++ b/examples/jaffle_shop/docs/README.md @@ -2,6 +2,7 @@ ## Schemas +- [`analytics`](./analytics) - [`core`](./core) - [`staging`](./staging) @@ -10,9 +11,11 @@ ```mermaid %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% flowchart TB - core(core) + analytics(analytics) staging(staging) + core(core) staging --> core + core --> analytics ``` ## Flowchart @@ -20,6 +23,10 @@ flowchart TB ```mermaid %%{init: {"flowchart": {"defaultRenderer": "elk"}} }%% flowchart TB + subgraph analytics + analytics.kpis(kpis) + end + subgraph core core.customers(customers) core.orders(orders) @@ -36,5 +43,7 @@ flowchart TB staging.payments --> core.customers staging.orders --> core.orders staging.payments --> core.orders + core.customers --> analytics.kpis + core.orders --> analytics.kpis ``` diff --git a/examples/jaffle_shop/docs/analytics/README.md b/examples/jaffle_shop/docs/analytics/README.md new file mode 100644 index 0000000..c0325c1 --- /dev/null +++ b/examples/jaffle_shop/docs/analytics/README.md @@ -0,0 +1,20 @@ +# analytics + +## Table of contents + +- [kpis](#kpis) + +## Views + +### kpis + +```sql +SELECT * +FROM jaffle_shop_max.analytics__kpis +``` + +| Column | Type | Description | Unique | +|:---------|:----------|:--------------|:---------| +| metric | `VARCHAR` | | | +| value | `BIGINT` | | | + diff --git a/examples/jaffle_shop/views/analytics/kpis.sql b/examples/jaffle_shop/views/analytics/kpis.sql new file mode 100644 index 0000000..ef3b5ba --- /dev/null +++ b/examples/jaffle_shop/views/analytics/kpis.sql @@ -0,0 +1,13 @@ +SELECT + 'n_customers' AS metric, + COUNT(*) AS value +FROM + jaffle_shop.core__customers + +UNION ALL + +SELECT + 'n_orders' AS metric, + COUNT(*) AS value +FROM + jaffle_shop.core__orders diff --git a/lea/app/run.py b/lea/app/run.py index 44c606f..9e3015f 100644 --- a/lea/app/run.py +++ b/lea/app/run.py @@ -64,6 +64,131 @@ def make_blacklist(dag: lea.views.DAGOfViews, only: list) -> set: return blacklist +def make_whitelist(query: str, dag: lea.views.DAGOfViews) -> set: + """Make a whitelist of tables given a query. + + These are the different queries to handle: + + schema.table + schema.table+ (descendants) + +schema.table (ancestors) + +schema.table+ (ancestors and descendants) + schema/ (all tables in schema) + schema/+ (all tables in schema with their descendants) + +schema/ (all tables in schema with their ancestors) + +schema/+ (all tables in schema with their ancestors and descendants) + + Examples + -------- + + >>> import lea + + >>> views = lea.views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb') + >>> views = [v for v in views if v.schema != 'tests'] + >>> dag = lea.views.DAGOfViews(views) + + >>> def pprint(whitelist): + ... for schema, table in sorted(whitelist): + ... print(f'{schema}.{table}') + + schema.table + + >>> pprint(make_whitelist('staging.orders', dag)) + staging.orders + + schema.table+ (descendants) + + >>> pprint(make_whitelist('staging.orders+', dag)) + analytics.kpis + core.customers + core.orders + staging.orders + + +schema.table (ancestors) + + >>> pprint(make_whitelist('+core.customers', dag)) + core.customers + staging.customers + staging.orders + staging.payments + + +schema.table+ (ancestors and descendants) + + >>> pprint(make_whitelist('+core.customers+', dag)) + analytics.kpis + core.customers + staging.customers + staging.orders + staging.payments + + schema/ (all tables in schema) + + >>> pprint(make_whitelist('staging/', dag)) + staging.customers + staging.orders + staging.payments + + schema/+ (all tables in schema with their descendants) + + >>> pprint(make_whitelist('staging/+', dag)) + analytics.kpis + core.customers + core.orders + staging.customers + staging.orders + staging.payments + + +schema/ (all tables in schema with their ancestors) + + >>> pprint(make_whitelist('+core/', dag)) + core.customers + core.orders + staging.customers + staging.orders + staging.payments + + +schema/+ (all tables in schema with their ancestors and descendants) + + >>> pprint(make_whitelist('+core/+', dag)) + analytics.kpis + core.customers + core.orders + staging.customers + staging.orders + staging.payments + + """ + + def _yield_whitelist(query, include_ancestors, include_descendants): + if query.endswith("+"): + yield from _yield_whitelist( + query[:-1], include_ancestors=include_ancestors, include_descendants=True + ) + return + if query.startswith("+"): + yield from _yield_whitelist( + query[1:], include_ancestors=True, include_descendants=include_descendants + ) + return + if query.endswith("/"): + for schema, table in dag: + if schema == query[:-1]: + yield from _yield_whitelist( + f"{schema}.{table}", + include_ancestors=include_ancestors, + include_descendants=include_descendants, + ) + else: + schema, table = query.split(".") + yield schema, table + if include_ancestors: + yield from dag.list_ancestors((schema, table)) + if include_descendants: + yield from dag.list_descendants((schema, table)) + + return set(_yield_whitelist(query, include_ancestors=False, include_descendants=False)) + + def run( client: lea.clients.Client, views_dir: str, diff --git a/lea/views/dag.py b/lea/views/dag.py index 8deaf5c..f31839f 100644 --- a/lea/views/dag.py +++ b/lea/views/dag.py @@ -56,12 +56,12 @@ def _to_mermaid_views(self): self.dependencies.keys() ) schema_nodes = itertools.groupby(sorted(nodes), lambda node: node[0]) - for schema, nodes in schema_nodes: + for schema, nodes in sorted(schema_nodes): out.write(f" subgraph {schema}\n") - for _, node in nodes: + for _, node in sorted(nodes): out.write(f" {schema}.{node}({node})\n") out.write(" end\n\n") - for dst, srcs in self.dependencies.items(): + for dst, srcs in sorted(self.dependencies.items()): dst = ".".join(dst) for src in sorted(srcs): src = ".".join(src) @@ -76,10 +76,10 @@ def _to_mermaid_schemas(self): nodes = set(node for deps in schema_dependencies.values() for node in deps) | set( schema_dependencies.keys() ) - for node in nodes: + for node in sorted(nodes): out.write(f" {node}({node})\n") - for dst, srcs in schema_dependencies.items(): - for src in srcs: + for dst, srcs in sorted(schema_dependencies.items()): + for src in sorted(srcs): out.write(f" {src} --> {dst}\n") return out.getvalue() diff --git a/pyproject.toml b/pyproject.toml index ac49f79..4fd871d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "lea-cli" -version = "0.1.0" +version = "0.2.0" description = "A minimalist alternative to dbt" authors = ["Max Halford "] packages = [ diff --git a/tests/test_examples.py b/tests/test_examples.py index 107c405..21d129c 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -38,7 +38,7 @@ def test_jaffle_shop(): # Check number of tables created con = duckdb.connect("duckdb.db") tables = con.sql("SELECT table_schema, table_name FROM information_schema.tables").df() - assert tables.shape[0] == 5 + assert tables.shape[0] == 6 # Check number of rows in core__customers customers = con.sql("SELECT * FROM jaffle_shop_max.core__customers").df()