Skip to content

Commit

Permalink
Add snowflake insights (#50)
Browse files Browse the repository at this point in the history
* created a separate code location for snowflake_insights

* updated _process_partitioned_dbt_assets function to incorporate snowflake_insights functionality

* added missing dependencies

added missing comma

added gql dependency

add missing import

* add missing deps

* added missing deps

* Update Dockerfile

fix libcrytpo

---------

Co-authored-by: Sean Lopp <[email protected]>
  • Loading branch information
izzye84 and slopp authored Nov 2, 2023
1 parent f911a36 commit d89c859
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 22 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ jobs:
with:
command: "ci set-build-output --location-name=batch_enrichment --image-tag=$IMAGE_TAG-batch-enrichment"

# Build 'snowflake_insights' code location
- name: Build and upload Docker image for snowflake insights
if: steps.prerun.outputs.result != 'skip'
uses: docker/build-push-action@v4
with:
context: ./hooli_snowflake_insights
push: true
tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-snowflake-insights

- name: Update build session with image tag for snowflake insights
id: ci-set-build-output-snowflake-insights
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]
with:
command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights"

# Deploy
- name: Deploy to Dagster Cloud
id: ci-deploy
Expand Down
6 changes: 6 additions & 0 deletions dagster_cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ locations:
build:
directory: ./hooli_batch_enrichment
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod
- location_name: snowflake_insights
code_source:
python_file: definitions.py
build:
directory: ./hooli_snowflake_insights
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod

6 changes: 5 additions & 1 deletion dbt_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ clean-targets:
- "dbt_packages"

models:
+materialized: table
+materialized: table

query-comment:
comment: "snowflake_dagster_dbt_v1_opaque_id[[[{{ node.unique_id }}:{{ invocation_id }}]]]"
append: true
49 changes: 29 additions & 20 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
from typing import Any, Mapping
from dagster._utils import file_relative_path
from dagster_dbt import DbtCliResource, DagsterDbtTranslator
from dagster_dbt import (
load_assets_from_dbt_project,
default_metadata_from_dbt_resource_props,
)
from dagster_dbt.asset_decorator import dbt_assets
import json
import textwrap
from dateutil import parser
from pathlib import Path
from typing import Any, Generator, Mapping, Union

from dagster import (
AutoMaterializePolicy,
AutoMaterializeRule,
AssetCheckResult,
AssetKey,
AssetObservation,
DailyPartitionsDefinition,
WeeklyPartitionsDefinition,
OpExecutionContext,
Output,
MetadataValue,
BackfillPolicy,
)
from dateutil import parser
import json
import textwrap
from pathlib import Path
from dagster_cloud.dagster_insights import dbt_with_snowflake_insights
from dagster_dbt import (
DbtCliEventMessage,
DbtCliInvocation,
DbtCliResource,
DagsterDbtTranslator,
load_assets_from_dbt_project,
default_metadata_from_dbt_resource_props,
)
from dagster_dbt.asset_decorator import dbt_assets
from dagster._utils import file_relative_path


# many dbt assets use an incremental approach to avoid
# re-processing all data on each run
Expand Down Expand Up @@ -117,11 +124,8 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes

dbt_cli_task = dbt2.cli(dbt_args, context=context)

dbt_events = list(dbt_cli_task.stream_raw_events())

for event in dbt_events:
# add custom metadata to the asset materialization event
context.log.info(event)
# This function adds model start and end time to derive total execution time
def handle_dbt_event(event: DbtCliEventMessage) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]:
for dagster_event in event.to_default_asset_events(
manifest=dbt_cli_task.manifest
):
Expand All @@ -130,7 +134,6 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes

started_at = parser.isoparse(event_node_info["node_started_at"])
completed_at = parser.isoparse(event_node_info["node_finished_at"])

metadata = {
"Execution Started At": started_at.isoformat(timespec="seconds"),
"Execution Completed At": completed_at.isoformat(
Expand All @@ -144,8 +147,14 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes
metadata=metadata,
output_name=dagster_event.output_name,
)

yield dagster_event

# This function emits an AssetObservation with the dbt model's invocation ID and unique ID (needed for Snowflake Insights)
def handle_all_dbt_events(dbt_cli_task: DbtCliInvocation) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]:
for raw_event in dbt_cli_task.stream_raw_events():
yield from handle_dbt_event(raw_event)

yield from dbt_with_snowflake_insights(context, dbt_cli_task, dagster_events=handle_all_dbt_events(dbt_cli_task))

if not dbt_cli_task.is_successful():
raise Exception("dbt command failed, see preceding events")
Expand Down
12 changes: 12 additions & 0 deletions hooli_snowflake_insights/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM python:3.10-slim

WORKDIR /opt/dagster/app

RUN apt-get update && apt-get install -y git

RUN python -m pip uninstall oscrypto -y
RUN python -m pip install git+https://github.com/wbond/oscrypto.git@d5f3437ed24257895ae1edd9e503cfb352e635a8

ADD . .

RUN pip install -r requirements.txt
48 changes: 48 additions & 0 deletions hooli_snowflake_insights/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os

from dagster import Definitions, EnvVar, ResourceDefinition
from dagster_cloud.dagster_insights import (
create_snowflake_insights_asset_and_schedule,
)
from dagster_snowflake import SnowflakeResource

# Used to derive environment (LOCAL, BRANCH, PROD)
def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"

# Setting connection details by environment
resource_def = {
"LOCAL": {
"snowflake_insights": ResourceDefinition.none_resource(),
},
"BRANCH": {
"snowflake_insights": SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
},
"PROD": {
"snowflake_insights": SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
},
}

# Creates an asset (poll_snowflake_query_history_hour) and sets its schedule
snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule(
"2023-10-29-00:00",
snowflake_resource_key="snowflake_insights",
)

defs = Definitions(
assets=[*snowflake_insights_definitions.assets,],
schedules=[snowflake_insights_definitions.schedule,],
resources=resource_def[get_env()],
)
6 changes: 6 additions & 0 deletions hooli_snowflake_insights/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dagster
dagster-cloud
dagster-dbt
dagster-snowflake
gql
requests_toolbelt
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@
dbt-duckdb
responses
plotnine
requests
requests
gql
requests_toolbelt
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
"dagster-pyspark",
"dagster-databricks",
"dagstermill",
"gql",
"plotnine",
"responses",
"requests",
"requests_toolbelt",
"html5lib",
"scikit-learn"
],
Expand Down

0 comments on commit d89c859

Please sign in to comment.