Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
Support notebook models (#316)
Browse files Browse the repository at this point in the history
* Notebook support for python models and globals

* Generate Python models SQL in fal directory inside models directory, mimicking structure

* Add notebook example and generated SQL
  • Loading branch information
chamini2 authored May 19, 2022
1 parent a2b996a commit abf5089
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

{{ config(materialized='ephemeral') }}
/*
FAL_GENERATED f3d686c040e94a5b33aa082f0ddcd6d3
Script dependencies:
{{ ref('model_c') }}
*/

SELECT * FROM {{ target.schema }}.{{ model.name }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

{{ config(materialized='ephemeral') }}
/*
FAL_GENERATED ed955dd5ab415f33c13a1efa0de2fb89
Script dependencies:
{{ ref("model_b") }}
{{ ref('model_a') }}
*/

SELECT * FROM {{ target.schema }}.{{ model.name }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from faldbt.magics import init_fal\n",
"%init_fal project_dir=../.. profiles_dir=../../.. default_model_name=model_e_notebook"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = ref(\"model_c\")\n",
"\n",
"df[\"my_null\"] = None\n",
"\n",
"write_to_model(df)"
]
}
],
"metadata": {
"language_info": {
"name": "python"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
66 changes: 54 additions & 12 deletions src/fal/cli/model_generator/model_generator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import ast
import re
from typing import List, cast
from pathlib import Path
from fal.fal_script import python_from_file

from faldbt.parse import load_dbt_project_contract
from fal.cli.model_generator.deps_generator import generate_dbt_dependencies

from dbt.logger import GLOBAL_LOGGER as logger

SQL_MODEL_TEMPLATE = """
{{ config(materialized='ephemeral') }}
/*
FAL_GENERATED
FAL_GENERATED __checksum__
Script dependencies:
Expand All @@ -26,36 +30,74 @@ def generate_python_dbt_models(project_dir: str):
model_paths = map(
project_path.joinpath, cast(List[str], project_contract.model_paths)
)
python_paths = _find_python_files(list(model_paths))

python_paths: List[Path] = []
for model_path in model_paths:
python_paths.extend(_generate_python_dbt_models(model_path))

return dict([(path.stem, path) for path in python_paths])


GENERATED_DIR = Path("fal")


def _generate_python_dbt_models(model_path: Path):
python_paths = _find_python_files(model_path)

for py_path in python_paths:
sql_path = py_path.with_suffix(".sql")
# models/staging/model.py -> models/fal/staging/model.sql
py_relative_path = py_path.relative_to(model_path)
sql_relative_path = GENERATED_DIR.joinpath(py_relative_path).with_suffix(".sql")
sql_path = model_path.joinpath(sql_relative_path)

_check_path_safe_to_write(sql_path, py_path)

with open(py_path) as file:
module = ast.parse(file.read(), str(py_path), "exec")
source_code = python_from_file(py_path)
module = ast.parse(source_code, str(py_path), "exec")

dbt_deps: str = generate_dbt_dependencies(module)
sql_contents = SQL_MODEL_TEMPLATE.replace("__deps__", dbt_deps)
checksum, _ = _checksum(sql_contents)
sql_contents = sql_contents.replace("__checksum__", checksum)

sql_path.parent.mkdir(parents=True, exist_ok=True)
with open(sql_path, "w") as file:
file.write(sql_contents)

return dict([(path.stem, path) for path in python_paths])
return python_paths


# TODO: unit tests
def _check_path_safe_to_write(sql_path: Path, py_path: Path):
if sql_path.exists():
with open(sql_path, "r") as file:
contents = file.read()
if "FAL_GENERATED" not in contents:
checksum, found = _checksum(contents)
if not found or checksum != found:
logger.debug(
f"Calculated checksum: {checksum}\nFound checksum: {found}"
)
raise RuntimeError(
f"File '{sql_path}' not generated by fal would be overwritten by generated model of '{py_path}'. Please rename or remove."
)


def _find_python_files(model_paths: List[Path]) -> List[Path]:
paths_lists = map(lambda p: list(p.rglob("*.py")), model_paths)
flat_paths = sum(paths_lists, [])
flat_files = filter(lambda p: p.is_file(), flat_paths)
return list(flat_files)
CHECKSUM_REGEX = re.compile(r"FAL_GENERATED ([_\d\w]+)")


def _checksum(contents: str):
import hashlib

found = CHECKSUM_REGEX.search(contents)
to_check = CHECKSUM_REGEX.sub("FAL_GENERATED", contents.strip())
return (
hashlib.md5(to_check.encode("utf-8")).hexdigest(),
found.group(1) if found else None,
)


def _find_python_files(model_path: Path) -> List[Path]:
py_files = model_path.rglob("*.py")
nb_files = model_path.rglob("*.ipynb")
files = [*py_files, *nb_files]
return [p for p in files if p.is_file()]
16 changes: 6 additions & 10 deletions src/fal/cli/selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,13 @@ def depth(self, selector: str) -> Optional[int]:
OP_CHILDREN = SelectorGraphOpDepth(re.compile("(?P<rest>.*)\\+(?P<depth>\\d*)$"))


IS_BEFORE_SCRIPT_REGEX = re.compile("^script.*.BEFORE.*.(ipynb|py)$")
IS_AFTER_SCRIPT_REGEX = re.compile("^script.*.AFTER.*.(ipynb|py)$")


def _is_before_script(id: str) -> bool:
if id.endswith(".ipynb"):
before_script_regex = re.compile("script.*.BEFORE.*.ipynb")
else:
before_script_regex = re.compile("script.*.BEFORE.*.py")
return bool(before_script_regex.match(id))
return bool(IS_BEFORE_SCRIPT_REGEX.match(id))


def _is_after_script(id: str) -> bool:
if id.endswith(".ipynb"):
after_script_regex = re.compile("script.*.AFTER.*.ipynb")
else:
after_script_regex = re.compile("script.*.AFTER.*.py")
return bool(after_script_regex.match(id))
return bool(IS_AFTER_SCRIPT_REGEX.match(id))
29 changes: 16 additions & 13 deletions src/fal/fal_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,8 @@ def exec(self, faldbt: FalDbt):
"""
# Enable local imports
try:
if str(self.path).endswith(".ipynb"):
raw_source_code = _process_ipynb(str(self.path))
source_code = compile(raw_source_code, self.path, "exec")
else:
with open(self.path) as file:
source_code = compile(file.read(), self.path, "exec")
source_code = python_from_file(self.path)
program = compile(source_code, self.path, "exec")

exec_globals = {
"context": self._build_script_context(),
Expand All @@ -109,7 +105,7 @@ def exec(self, faldbt: FalDbt):
faldbt.write_to_model, target_1=self.model.name, target_2=None
)

exec(source_code, exec_globals)
exec(program, exec_globals)
finally:
pass

Expand Down Expand Up @@ -170,16 +166,23 @@ def _process_tests(tests: List[Any]):
)


def _process_ipynb(filepath: str) -> str:
def python_from_file(path: Path) -> str:
with open(path) as file:
raw_source_code = file.read()
if path.suffix == ".ipynb":
raw_source_code = _process_ipynb(raw_source_code)
return raw_source_code


def _process_ipynb(raw_source_code: str) -> str:
def strip_magic(source: List[str]) -> List[str]:
IMPORT_STMT = "from faldbt.magics import init_fal"
return (item for item in source if item[0] != "%" and item != IMPORT_STMT)
NOTEBOOK_LIB = "faldbt.magics"
return [item for item in source if item[0] != "%" and NOTEBOOK_LIB not in item]

with open(filepath) as raw_data:
raw_script = json.load(raw_data)
ipynb_struct = json.loads(raw_source_code)

script_list = []
for cell in raw_script["cells"]:
for cell in ipynb_struct["cells"]:
if cell["cell_type"] == "code":
source = strip_magic(cell["source"])
script_list.append("".join(source))
Expand Down
3 changes: 2 additions & 1 deletion src/faldbt/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def get_dbt_results(


def get_scripts_list(scripts_dir: str) -> List[str]:
return glob.glob(os.path.join(scripts_dir, "**.py"), recursive=True)
scripts_path = Path(scripts_dir)
return list(map(str, [*scripts_path.rglob("*.py"), *scripts_path.rglob("*.ipynb")]))


def get_global_script_configs(source_dirs: List[Path]) -> Dict[str, List[str]]:
Expand Down

0 comments on commit abf5089

Please sign in to comment.