Skip to content

Commit

Permalink
Migrating dbt to 1.6 and programmatic invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
pgoslatara committed Oct 13, 2023
1 parent 11e87aa commit e62a6de
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 24 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ TODO

# Others

## Running dbt from python

In version 1.5, dbt introduced [programmatic invocations](https://docs.getdbt.com/reference/programmatic-invocations), a way of calling dbt commands natively from python including the ability to retrieve returned data. Previous ways of doing this mostly relied on opening a new shell process and calling the dbt CLI, this wasn't ideal for a lot of reasons including security. This repo further abstracts programmatic invocations to a dedicated helper function, see `run_dbt_command` in `./scripts/utils.py`.

## Conferences

This repository accompanies some conference talks:
- [NL dbt meetup: 2nd Edition](https://www.meetup.com/amsterdam-dbt-meetup/events/293640417/): "CI for dbt: Beyond the basics!", slides available [here](https://docs.google.com/presentation/d/1Y5fx4h97IY0wpsutt92nPLO1UDUcrq6YdVKt-UuL93c/edit#slide=id.p).
- [MDSFest](https://www.linkedin.com/events/7091868349487353856/): "CI for dbt: Beyond the basics!", slides available [here](https://docs.google.com/presentation/d/1M0475jIX41uxT-nLPWlymUkstuUzkq-LppZqT_o759Q/edit#slide=id.g260e469f8e9_0_7), video available [here](https://www.youtube.com/watch?v=bRKk6F07G58).
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ clean-targets:
- "target"
- "dbt_packages"

require-dbt-version: [">=1.5.0", "<1.6.0"]
require-dbt-version: [">=1.6.0", "<1.7.0"]

models:
+labels:
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dbt-bigquery>=1.5.0,<1.6.0
dbt-bigquery>=1.6.0,<1.7.0
pre-commit
pytest
pytest-xdist
shandy-sqlfmt[jinjafmt]==0.18.1
shandy-sqlfmt[jinjafmt]==0.20.0
1 change: 0 additions & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ google-cloud-storage>=2.7.0,<3.0.0
pyarrow
pytablewriter
retry
sh>=2.0.0
8 changes: 4 additions & 4 deletions scripts/mart_monitor_commenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
delete_github_pr_bot_comments,
download_manifest_json,
get_gcp_auth_clients,
run_dbt_shell_command,
run_dbt_command,
send_github_pr_comment,
set_logging_options,
)
Expand Down Expand Up @@ -66,15 +66,15 @@ def compare_manifests_and_comment_impacted_models(
)

directly_impacted_models = sorted(
run_dbt_shell_command(
run_dbt_command(
f"dbt --quiet ls --select state:modified --state ./.state --resource-type model --target {env}"
)
)
logging.info(f"{directly_impacted_models=}")
direct_md = "\n".join([f'| {x.split(".")[-1]} |' for x in directly_impacted_models])

indirectly_impacted_models = sorted(
run_dbt_shell_command(
run_dbt_command(
f"dbt --quiet ls --select state:modified+ --state ./.state --resource-type model --target {env}"
)
)
Expand All @@ -87,7 +87,7 @@ def compare_manifests_and_comment_impacted_models(
manifest_json = json.load(f)

impacted_exposures = sorted(
run_dbt_shell_command(
run_dbt_command(
f"dbt --quiet ls --select state:modified+ --state ./.state --resource-type exposure --target {env}"
)
)
Expand Down
8 changes: 4 additions & 4 deletions scripts/run_dbt_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
ManifestInitRunError,
call_github_api,
download_manifest_json,
run_dbt_shell_command,
run_dbt_command,
send_github_pr_comment,
set_logging_options,
)
Expand All @@ -31,7 +31,7 @@ def run_dbt_backfill(env: str) -> None:
)

# List modified nodes
modified_nodes_raw = run_dbt_shell_command(
modified_nodes_raw = run_dbt_command(
f"dbt ls --select state:modified,package:beyond_basics --state ./.state --resource-type model --target {env}"
)

Expand Down Expand Up @@ -60,7 +60,7 @@ def run_dbt_backfill(env: str) -> None:
)

# Fully refresh modified nodes and their downstream dependencies
run_dbt_shell_command(
run_dbt_command(
f"dbt build --select state:modified+,package:beyond_basics --state ./.state --full-refresh --target {env}",
foreground=True, # prints logs in GitHub workflow in real time to help with monitoring and logging
)
Expand Down Expand Up @@ -99,7 +99,7 @@ def main() -> None:

if (
"init_run" not in locals()
): # i.e. on inital run no manifest.json to compare with so need to skip
): # i.e. on initial run no manifest.json to compare with so need to skip
run_dbt_backfill(target_branch)


Expand Down
24 changes: 12 additions & 12 deletions scripts/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import re
from datetime import datetime
from pathlib import Path
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Mapping, Optional, Union

import requests
from dbt.cli.main import dbtRunner
from google.cloud import bigquery, storage
from google.oauth2 import service_account
from retry import retry
from sh import dbt


class GitHubAPIRateLimitError(Exception):
Expand Down Expand Up @@ -232,19 +232,19 @@ def get_gcp_auth_clients(env: str) -> dict:
}


def run_dbt_shell_command(
command: str, foreground: bool = False
) -> Optional[Union[List[str], None]]:
"""
Runs dbt on the command line anre returns the output as a non-null newline-delimited list of strings
def run_dbt_command(
dbt_command: str,
) -> None:
"""Runs a dbt command.
Args:
dbt_command (str): The dbt command to be run, e.g. "dbt parse"
"""

logging.info(f"Running: {command}...")
res = dbtRunner().invoke(dbt_command.split(" ")[1:])

if foreground is False:
return [x for x in dbt(command.split(" ")[1:]).split("\n") if x != ""]
else:
dbt(command.split(" ")[1:], _fg=foreground)
if res.exception:
raise RuntimeError("dbt command did not complete successfully.")


def send_github_pr_comment(pull_request_id: int, message: str) -> str:
Expand Down

0 comments on commit e62a6de

Please sign in to comment.