Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snowflake insights #50

Merged
merged 6 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ jobs:
with:
command: "ci set-build-output --location-name=batch_enrichment --image-tag=$IMAGE_TAG-batch-enrichment"

# Build 'snowflake_insights' code location
- name: Build and upload Docker image for snowflake insights
if: steps.prerun.outputs.result != 'skip'
uses: docker/build-push-action@v4
with:
context: ./hooli_snowflake_insights
push: true
tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-snowflake-insights

- name: Update build session with image tag for snowflake insights
id: ci-set-build-output-snowflake-insights
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]
with:
command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights"

# Deploy
- name: Deploy to Dagster Cloud
id: ci-deploy
Expand Down
6 changes: 6 additions & 0 deletions dagster_cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ locations:
build:
directory: ./hooli_batch_enrichment
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod
- location_name: snowflake_insights
code_source:
python_file: definitions.py
build:
directory: ./hooli_snowflake_insights
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod

6 changes: 5 additions & 1 deletion dbt_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ clean-targets:
- "dbt_packages"

models:
+materialized: table
+materialized: table

query-comment:
comment: "snowflake_dagster_dbt_v1_opaque_id[[[{{ node.unique_id }}:{{ invocation_id }}]]]"
append: true
49 changes: 29 additions & 20 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
from typing import Any, Mapping
from dagster._utils import file_relative_path
from dagster_dbt import DbtCliResource, DagsterDbtTranslator
from dagster_dbt import (
load_assets_from_dbt_project,
default_metadata_from_dbt_resource_props,
)
from dagster_dbt.asset_decorator import dbt_assets
import json
import textwrap
from dateutil import parser
from pathlib import Path
from typing import Any, Generator, Mapping, Union

from dagster import (
AutoMaterializePolicy,
AutoMaterializeRule,
AssetCheckResult,
AssetKey,
AssetObservation,
DailyPartitionsDefinition,
WeeklyPartitionsDefinition,
OpExecutionContext,
Output,
MetadataValue,
BackfillPolicy,
)
from dateutil import parser
import json
import textwrap
from pathlib import Path
from dagster_cloud.dagster_insights import dbt_with_snowflake_insights
from dagster_dbt import (
DbtCliEventMessage,
DbtCliInvocation,
DbtCliResource,
DagsterDbtTranslator,
load_assets_from_dbt_project,
default_metadata_from_dbt_resource_props,
)
from dagster_dbt.asset_decorator import dbt_assets
from dagster._utils import file_relative_path


# many dbt assets use an incremental approach to avoid
# re-processing all data on each run
Expand Down Expand Up @@ -117,11 +124,8 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes

dbt_cli_task = dbt2.cli(dbt_args, context=context)

dbt_events = list(dbt_cli_task.stream_raw_events())

for event in dbt_events:
# add custom metadata to the asset materialization event
context.log.info(event)
# This function adds model start and end time to derive total execution time
def handle_dbt_event(event: DbtCliEventMessage) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]:
for dagster_event in event.to_default_asset_events(
manifest=dbt_cli_task.manifest
):
Expand All @@ -130,7 +134,6 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes

started_at = parser.isoparse(event_node_info["node_started_at"])
completed_at = parser.isoparse(event_node_info["node_finished_at"])

metadata = {
"Execution Started At": started_at.isoformat(timespec="seconds"),
"Execution Completed At": completed_at.isoformat(
Expand All @@ -144,8 +147,14 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes
metadata=metadata,
output_name=dagster_event.output_name,
)

yield dagster_event

# This function emits an AssetObservation with the dbt model's invocation ID and unique ID (needed for Snowflake Insights)
def handle_all_dbt_events(dbt_cli_task: DbtCliInvocation) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]:
for raw_event in dbt_cli_task.stream_raw_events():
yield from handle_dbt_event(raw_event)

yield from dbt_with_snowflake_insights(context, dbt_cli_task, dagster_events=handle_all_dbt_events(dbt_cli_task))

if not dbt_cli_task.is_successful():
raise Exception("dbt command failed, see preceding events")
Expand Down
12 changes: 12 additions & 0 deletions hooli_snowflake_insights/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM python:3.10-slim

WORKDIR /opt/dagster/app

RUN apt-get update && apt-get install -y git

RUN python -m pip uninstall oscrypto -y
RUN python -m pip install git+https://github.com/wbond/oscrypto.git@d5f3437ed24257895ae1edd9e503cfb352e635a8

ADD . .

RUN pip install -r requirements.txt
48 changes: 48 additions & 0 deletions hooli_snowflake_insights/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os

from dagster import Definitions, EnvVar, ResourceDefinition
from dagster_cloud.dagster_insights import (
create_snowflake_insights_asset_and_schedule,
)
from dagster_snowflake import SnowflakeResource

# Used to derive environment (LOCAL, BRANCH, PROD)
def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"

# Setting connection details by environment
resource_def = {
"LOCAL": {
"snowflake_insights": ResourceDefinition.none_resource(),
},
"BRANCH": {
"snowflake_insights": SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
},
"PROD": {
"snowflake_insights": SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
),
},
}

# Creates an asset (poll_snowflake_query_history_hour) and sets its schedule
snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule(
"2023-10-29-00:00",
snowflake_resource_key="snowflake_insights",
)

defs = Definitions(
assets=[*snowflake_insights_definitions.assets,],
schedules=[snowflake_insights_definitions.schedule,],
resources=resource_def[get_env()],
)
6 changes: 6 additions & 0 deletions hooli_snowflake_insights/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dagster
dagster-cloud
dagster-dbt
dagster-snowflake
gql
requests_toolbelt
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@
dbt-duckdb
responses
plotnine
requests
requests
gql
requests_toolbelt
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
"dagster-pyspark",
"dagster-databricks",
"dagstermill",
"gql",
"plotnine",
"responses",
"requests",
"requests_toolbelt",
"html5lib",
"scikit-learn"
],
Expand Down
Loading