Skip to content

Commit

Permalink
🧹
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxHalford committed Oct 16, 2023
1 parent bfa28a7 commit 4c270d4
Show file tree
Hide file tree
Showing 20 changed files with 196 additions and 118 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "examples/jaffle_shop/jaffle_shop"]
path = examples/jaffle_shop/jaffle_shop
url = https://github.com/dbt-labs/jaffle_shop/
23 changes: 23 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
files: .
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: check-json
- id: check-yaml
- id: trailing-whitespace
- id: mixed-line-ending

- repo: local
hooks:
- id: black
name: black
language: python
types: [python]
entry: black

- id: ruff
name: ruff
language: python
types: [python]
entry: ruff
45 changes: 45 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Contributing

## Setup

Start by cloning the repository:

```sh
git clone https://github.com/carbonfact/lea
```

Next, you'll need a Python environment:

```sh
pyenv install -v 3.11
```

You'll also need [Poetry](https://python-poetry.org/):

```sh
curl -sSL https://install.python-poetry.org | python3 -
poetry install
poetry shell
```

## Testing

You can run tests once the environment is set up:

```sh
pytest
```

## Code quality

Install the code quality routine so that it runs each time you try to push your commits.

```sh
pre-commit install --hook-type pre-push
```

You can also run the code quality routine ad-hoc.

```sh
pre-commit run --all-files
```
2 changes: 1 addition & 1 deletion examples/jaffle_shop/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
```mermaid
%%{init: {"flowchart": {"defaultRenderer": "elk"}} }%%
flowchart TB
staging(staging)
core(core)
staging(staging)
staging --> core
```

Expand Down
1 change: 1 addition & 0 deletions examples/jaffle_shop/jaffle_shop
Submodule jaffle_shop added at b0b77a
20 changes: 10 additions & 10 deletions lea/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@ def env_validate_callback(env_path: str | None):

@app.command()
def prepare(production: bool = False, env: str = EnvPath):
"""
"""
""" """
client = _make_client(production)
client.prepare(console)


@app.command()
def teardown(production: bool = False, env: str = EnvPath):
"""
"""
""" """

if production:
raise ValueError("This is a dangerous operation, so it is not allowed in production.")
Expand All @@ -61,7 +57,7 @@ def run(
threads: int = 8,
show: int = 20,
raise_exceptions: bool = False,
env: str = EnvPath
env: str = EnvPath,
):
from lea.app.run import run

Expand Down Expand Up @@ -92,7 +88,7 @@ def test(
threads: int = 8,
production: bool = False,
raise_exceptions: bool = False,
env: str = EnvPath
env: str = EnvPath,
):
from lea.app.test import test

Expand All @@ -110,15 +106,19 @@ def test(


@app.command()
def docs(views_dir: str = ViewsDir, output_dir: str = "docs", production: bool = False, env: str = EnvPath):
def docs(
views_dir: str = ViewsDir,
output_dir: str = "docs",
production: bool = False,
env: str = EnvPath,
):
from lea.app.docs import docs

client = _make_client(production=production)

docs(views_dir=views_dir, output_dir=output_dir, client=client, console=console)



@app.command()
def diff(origin: str, destination: str, env: str = EnvPath):
from lea.app.diff import calculate_diff
Expand Down
16 changes: 4 additions & 12 deletions lea/app/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,20 @@


def calculate_diff(origin: str, destination: str, client: lea.clients.Client) -> str:
diff_table = client.get_diff_summary(
origin=origin, destination=destination
)
diff_table = client.get_diff_summary(origin=origin, destination=destination)
if diff_table.empty:
return "No field additions or removals detected"

removed_tables = set(
diff_table[
diff_table.column.isnull() & (diff_table.diff_kind == "REMOVED")
].table
diff_table[diff_table.column.isnull() & (diff_table.diff_kind == "REMOVED")].table
)
added_tables = set(
diff_table[
diff_table.column.isnull() & (diff_table.diff_kind == "ADDED")
].table
diff_table[diff_table.column.isnull() & (diff_table.diff_kind == "ADDED")].table
)

buffer = io.StringIO()
print_ = functools.partial(print, file=buffer)
for table, columns in diff_table[diff_table.column.notnull()].groupby(
"table"
):
for table, columns in diff_table[diff_table.column.notnull()].groupby("table"):
if table in removed_tables:
print_(f"- {table}")
elif table in added_tables:
Expand Down
9 changes: 2 additions & 7 deletions lea/app/docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,11 @@ def docs(

# Write down the query
content.write(
"```sql\n"
"SELECT *\n"
f"FROM {client._make_view_path(view)}\n"
"```\n\n"
"```sql\n" "SELECT *\n" f"FROM {client._make_view_path(view)}\n" "```\n\n"
)
# Write down the columns
view_columns = columns.query(f"table == '{schema}__{view.name}'")[["column", "type"]]
view_comments = view.extract_comments(
columns=view_columns["column"].tolist()
)
view_comments = view.extract_comments(columns=view_columns["column"].tolist())
view_columns["Description"] = (
view_columns["column"]
.map(
Expand Down
23 changes: 8 additions & 15 deletions lea/app/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ def make_blacklist(dag: lea.views.DAGOfViews, only: list) -> set:

for schema, table in only:
# Ancestors
if schema.startswith('+'):
if schema.startswith("+"):
blacklist.difference_update(dag.list_ancestors((schema[1:], table)))
schema = schema[1:]

# Descendants
if table.endswith('+'):
if table.endswith("+"):
blacklist.difference_update(dag.list_descendants((schema, table[:-1])))
table = table[:-1]

Expand Down Expand Up @@ -138,17 +138,13 @@ def display_progress() -> rich.table.Table:
exceptions = {}
skipped = set()
cache_path = pathlib.Path(".cache.pkl")
cache = (
set()
if fresh or not cache_path.exists()
else pickle.loads(cache_path.read_bytes())
)
cache = set() if fresh or not cache_path.exists() else pickle.loads(cache_path.read_bytes())
tic = time.time()

console_log(f"{len(cache):,d} view(s) already done")

with rich.live.Live(
display_progress() , vertical_overflow="visible", refresh_per_second=6
display_progress(), vertical_overflow="visible", refresh_per_second=6
) as live:
while dag.is_active():
# We check if new views have been unlocked
Expand All @@ -169,18 +165,16 @@ def display_progress() -> rich.table.Table:

# A node can only be computed if all its dependencies have been computed
# If all the dependencies have not been computed succesfully, we skip the node
if any(
dep in skipped or dep in exceptions
for dep in dag[node].dependencies
):
if any(dep in skipped or dep in exceptions for dep in dag[node].dependencies):
skipped.add(node)
dag.done(node)
continue

jobs[node] = executor.submit(
_do_nothing
if dry or node in cache
else functools.partial(pretty_print_view, view=dag[node], console=console) if print_to_cli
else functools.partial(pretty_print_view, view=dag[node], console=console)
if print_to_cli
else functools.partial(client.create, view=dag[node])
)
jobs_started_at[node] = dt.datetime.now()
Expand All @@ -200,8 +194,7 @@ def display_progress() -> rich.table.Table:
cache = (
set()
if all_done
else cache
| {node for node in order if node not in exceptions and node not in skipped}
else cache | {node for node in order if node not in exceptions and node not in skipped}
)
if cache:
cache_path.write_bytes(pickle.dumps(cache))
Expand Down
9 changes: 2 additions & 7 deletions lea/app/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ def test(

generic_tests = []
for view in views:
view_columns = columns.query(f"table == '{view.schema}__{view.name}'")[
"column"
].tolist()
view_columns = columns.query(f"table == '{view.schema}__{view.name}'")["column"].tolist()
for generic_test in client.yield_unit_tests(view=view, view_columns=view_columns):
generic_tests.append(generic_test)
console.log(f"Found {len(generic_tests):,d} generic tests")
Expand All @@ -40,10 +38,7 @@ def test(
tests = [test for test in tests if test.name not in blacklist]

with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
jobs = {
executor.submit(client.load, test): test
for test in tests
}
jobs = {executor.submit(client.load, test): test for test in tests}
for job in concurrent.futures.as_completed(jobs):
test = jobs[job]
conflicts = job.result()
Expand Down
1 change: 1 addition & 0 deletions lea/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def make_client(production: bool):
)
elif warehouse == "duckdb":
from lea.clients.duckdb import DuckDB

return DuckDB(
path=os.environ["LEA_DUCKDB_PATH"],
schema=os.environ["LEA_SCHEMA"],
Expand Down
28 changes: 17 additions & 11 deletions lea/clients/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ def get_columns(self, schema: str) -> pd.DataFrame:

def get_diff_summary(self, origin: str, destination: str) -> pd.DataFrame:

origin_columns = set(map(tuple, self.get_columns(origin)[["table", "column"]].values.tolist()))
destination_columns = set(map(tuple, self.get_columns(destination)[["table", "column"]].values.tolist()))
origin_columns = set(
map(tuple, self.get_columns(origin)[["table", "column"]].values.tolist())
)
destination_columns = set(
map(
tuple,
self.get_columns(destination)[["table", "column"]].values.tolist(),
)
)

return pd.DataFrame(
[
Expand All @@ -90,25 +97,25 @@ def get_diff_summary(self, origin: str, destination: str) -> pd.DataFrame:
"column": None,
"diff_kind": "ADDED",
}
for table in {t for t, _ in origin_columns} - {t for t, _ in destination_columns}
] +
[
for table in {t for t, _ in origin_columns} - {t for t, _ in destination_columns}
]
+ [
{
"table": table,
"column": column,
"diff_kind": "ADDED",
}
for table, column in origin_columns - destination_columns
] +
[
]
+ [
{
"table": table,
"column": None,
"diff_kind": "REMOVED",
}
for table in {t for t, _ in destination_columns } - {t for t, _ in origin_columns}
] +
[
for table in {t for t, _ in destination_columns} - {t for t, _ in origin_columns}
]
+ [
{
"table": table,
"column": column,
Expand All @@ -118,7 +125,6 @@ def get_diff_summary(self, origin: str, destination: str) -> pd.DataFrame:
]
)


@abc.abstractmethod
def make_test_unique_column(self, view: views.View, column: str) -> str:
...
Expand Down
17 changes: 7 additions & 10 deletions lea/clients/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ def sqlglot_dialect(self):

@property
def dataset_name(self):
return (
f"{self._dataset_name}_{self.username}"
if self.username
else self._dataset_name
)
return f"{self._dataset_name}_{self.username}" if self.username else self._dataset_name

def create_dataset(self):
from google.cloud import bigquery
dataset_ref = bigquery.DatasetReference(project=self.project_id, dataset_id=self.dataset_name)

dataset_ref = bigquery.DatasetReference(
project=self.project_id, dataset_id=self.dataset_name
)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = self.location
dataset = self.client.create_dataset(dataset, exists_ok=True)
Expand All @@ -64,7 +63,7 @@ def _make_job(self, view: views.SQLView):
"tableId": f"{view.schema}__{view.name}" if view.schema else view.name,
},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_TRUNCATE"
"writeDisposition": "WRITE_TRUNCATE",
},
"labels": {
"job_dataset": self.dataset_name,
Expand Down Expand Up @@ -109,9 +108,7 @@ def list_existing_view_names(self):
]

def delete_view(self, view: views.View):
self.client.delete_table(
f"{self.project_id}.{self._make_view_path(view)}"
)
self.client.delete_table(f"{self.project_id}.{self._make_view_path(view)}")

def get_columns(self, schema=None) -> pd.DataFrame:
schema = schema or self.dataset_name
Expand Down
Loading

0 comments on commit 4c270d4

Please sign in to comment.