Skip to content

Commit

Permalink
allow selecting tables in a schema
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxHalford committed Oct 17, 2023
1 parent 1587319 commit f0103f3
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 16 deletions.
40 changes: 33 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
<img src="https://github.com/carbonfact/lea/actions/workflows/code-quality.yml/badge.svg" alt="code_quality">
</a>

<!-- PyPI -->
<a href="https://pypi.org/project/lea-cli">
<img src="https://img.shields.io/pypi/v/lea-cli.svg?label=release&color=blue" alt="pypi">
</a>

<!-- License -->
<a href="https://opensource.org/license/apache-2-0/">
<img src="https://img.shields.io/github/license/carbonfact/lea" alt="license">
Expand Down Expand Up @@ -149,7 +154,26 @@ lea run --only +core.users # users and everything it depends on
lea run --only +core.users+ # users and all its dependencies
```

You may of course do combinations:
You can select all views in a schema:

```sh
lea run --only core.*
```

There are thus 8 possible operators:

```
schema.table (table by itself)
schema.table+ (table with its descendants)
+schema.table (table with its ancestors)
+schema.table+ (table with its ancestors and descendants)
schema/ (all tables in schema)
schema/+ (all tables in schema with their descendants)
+schema/ (all tables in schema with their ancestors)
+schema/+ (all tables in schema with their ancestors and descendants)
```

Combinations are possible:

```sh
lea run --only core.users+ --only +core.orders
Expand Down Expand Up @@ -268,13 +292,15 @@ lea is meant to be used as a CLI. But you can import it as a Python library too.
**Parsing a directory of queries**

```py
>>> from lea import views
>>> import lea

>>> views = views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb')
>>> views = lea.views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb')
>>> views = [v for v in views if v.schema != 'tests']
>>> for view in sorted(views, key=str):
... print(view)
... print(sorted(view.dependencies))
analytics.kpis
[('core', 'customers'), ('core', 'orders')]
core.customers
[('staging', 'customers'), ('staging', 'orders'), ('staging', 'payments')]
core.orders
Expand All @@ -291,12 +317,11 @@ staging.payments
**Organizing queries into a DAG**

```py
>>> from lea import views
>>> from lea.views import DAGOfViews
>>> import lea

>>> views = views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb')
>>> views = lea.views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb')
>>> views = [v for v in views if v.schema != 'tests']
>>> dag = DAGOfViews(views)
>>> dag = lea.views.DAGOfViews(views)
>>> while dag.is_active():
... for schema, table in sorted(dag.get_ready()):
... print(f'{schema}.{table}')
Expand All @@ -306,6 +331,7 @@ staging.orders
staging.payments
core.customers
core.orders
analytics.kpis

```

Expand Down
11 changes: 10 additions & 1 deletion examples/jaffle_shop/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Schemas

- [`analytics`](./analytics)
- [`core`](./core)
- [`staging`](./staging)

Expand All @@ -10,16 +11,22 @@
```mermaid
%%{init: {"flowchart": {"defaultRenderer": "elk"}} }%%
flowchart TB
core(core)
analytics(analytics)
staging(staging)
core(core)
staging --> core
core --> analytics
```

## Flowchart

```mermaid
%%{init: {"flowchart": {"defaultRenderer": "elk"}} }%%
flowchart TB
subgraph analytics
analytics.kpis(kpis)
end
subgraph core
core.customers(customers)
core.orders(orders)
Expand All @@ -36,5 +43,7 @@ flowchart TB
staging.payments --> core.customers
staging.orders --> core.orders
staging.payments --> core.orders
core.customers --> analytics.kpis
core.orders --> analytics.kpis
```

20 changes: 20 additions & 0 deletions examples/jaffle_shop/docs/analytics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# analytics

## Table of contents

- [kpis](#kpis)

## Views

### kpis

```sql
SELECT *
FROM jaffle_shop_max.analytics__kpis
```

| Column | Type | Description | Unique |
|:---------|:----------|:--------------|:---------|
| metric | `VARCHAR` | | |
| value | `BIGINT` | | |

13 changes: 13 additions & 0 deletions examples/jaffle_shop/views/analytics/kpis.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
SELECT
'n_customers' AS metric,
COUNT(*) AS value
FROM
jaffle_shop.core__customers

UNION ALL

SELECT
'n_orders' AS metric,
COUNT(*) AS value
FROM
jaffle_shop.core__orders
125 changes: 125 additions & 0 deletions lea/app/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,131 @@ def make_blacklist(dag: lea.views.DAGOfViews, only: list) -> set:
return blacklist


def make_whitelist(query: str, dag: lea.views.DAGOfViews) -> set:
"""Make a whitelist of tables given a query.
These are the different queries to handle:
schema.table
schema.table+ (descendants)
+schema.table (ancestors)
+schema.table+ (ancestors and descendants)
schema/ (all tables in schema)
schema/+ (all tables in schema with their descendants)
+schema/ (all tables in schema with their ancestors)
+schema/+ (all tables in schema with their ancestors and descendants)
Examples
--------
>>> import lea
>>> views = lea.views.load_views('examples/jaffle_shop/views', sqlglot_dialect='duckdb')
>>> views = [v for v in views if v.schema != 'tests']
>>> dag = lea.views.DAGOfViews(views)
>>> def pprint(whitelist):
... for schema, table in sorted(whitelist):
... print(f'{schema}.{table}')
schema.table
>>> pprint(make_whitelist('staging.orders', dag))
staging.orders
schema.table+ (descendants)
>>> pprint(make_whitelist('staging.orders+', dag))
analytics.kpis
core.customers
core.orders
staging.orders
+schema.table (ancestors)
>>> pprint(make_whitelist('+core.customers', dag))
core.customers
staging.customers
staging.orders
staging.payments
+schema.table+ (ancestors and descendants)
>>> pprint(make_whitelist('+core.customers+', dag))
analytics.kpis
core.customers
staging.customers
staging.orders
staging.payments
schema/ (all tables in schema)
>>> pprint(make_whitelist('staging/', dag))
staging.customers
staging.orders
staging.payments
schema/+ (all tables in schema with their descendants)
>>> pprint(make_whitelist('staging/+', dag))
analytics.kpis
core.customers
core.orders
staging.customers
staging.orders
staging.payments
+schema/ (all tables in schema with their ancestors)
>>> pprint(make_whitelist('+core/', dag))
core.customers
core.orders
staging.customers
staging.orders
staging.payments
+schema/+ (all tables in schema with their ancestors and descendants)
>>> pprint(make_whitelist('+core/+', dag))
analytics.kpis
core.customers
core.orders
staging.customers
staging.orders
staging.payments
"""

def _yield_whitelist(query, include_ancestors, include_descendants):
if query.endswith("+"):
yield from _yield_whitelist(
query[:-1], include_ancestors=include_ancestors, include_descendants=True
)
return
if query.startswith("+"):
yield from _yield_whitelist(
query[1:], include_ancestors=True, include_descendants=include_descendants
)
return
if query.endswith("/"):
for schema, table in dag:
if schema == query[:-1]:
yield from _yield_whitelist(
f"{schema}.{table}",
include_ancestors=include_ancestors,
include_descendants=include_descendants,
)
else:
schema, table = query.split(".")
yield schema, table
if include_ancestors:
yield from dag.list_ancestors((schema, table))
if include_descendants:
yield from dag.list_descendants((schema, table))

return set(_yield_whitelist(query, include_ancestors=False, include_descendants=False))


def run(
client: lea.clients.Client,
views_dir: str,
Expand Down
12 changes: 6 additions & 6 deletions lea/views/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ def _to_mermaid_views(self):
self.dependencies.keys()
)
schema_nodes = itertools.groupby(sorted(nodes), lambda node: node[0])
for schema, nodes in schema_nodes:
for schema, nodes in sorted(schema_nodes):
out.write(f" subgraph {schema}\n")
for _, node in nodes:
for _, node in sorted(nodes):
out.write(f" {schema}.{node}({node})\n")
out.write(" end\n\n")
for dst, srcs in self.dependencies.items():
for dst, srcs in sorted(self.dependencies.items()):
dst = ".".join(dst)
for src in sorted(srcs):
src = ".".join(src)
Expand All @@ -76,10 +76,10 @@ def _to_mermaid_schemas(self):
nodes = set(node for deps in schema_dependencies.values() for node in deps) | set(
schema_dependencies.keys()
)
for node in nodes:
for node in sorted(nodes):
out.write(f" {node}({node})\n")
for dst, srcs in schema_dependencies.items():
for src in srcs:
for dst, srcs in sorted(schema_dependencies.items()):
for src in sorted(srcs):
out.write(f" {src} --> {dst}\n")
return out.getvalue()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "lea-cli"
version = "0.1.0"
version = "0.2.0"
description = "A minimalist alternative to dbt"
authors = ["Max Halford <[email protected]>"]
packages = [
Expand Down
2 changes: 1 addition & 1 deletion tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_jaffle_shop():
# Check number of tables created
con = duckdb.connect("duckdb.db")
tables = con.sql("SELECT table_schema, table_name FROM information_schema.tables").df()
assert tables.shape[0] == 5
assert tables.shape[0] == 6

# Check number of rows in core__customers
customers = con.sql("SELECT * FROM jaffle_shop_max.core__customers").df()
Expand Down

0 comments on commit f0103f3

Please sign in to comment.