From d89c859686085e5d89386eb108fe9f1197263feb Mon Sep 17 00:00:00 2001 From: izzy <60406698+izzye84@users.noreply.github.com> Date: Thu, 2 Nov 2023 12:29:12 -0600 Subject: [PATCH] Add snowflake insights (#50) * created a separate code location for snowflake_insights * updated _process_partitioned_dbt_assets function to incorporate snowflake_insights functionality * added missing dependencies added missing comma added gql dependency add missing import * add missing deps * added missing deps * Update Dockerfile fix libcrytpo --------- Co-authored-by: Sean Lopp --- .github/workflows/deploy-dagster-cloud.yml | 16 +++++++ dagster_cloud.yaml | 6 +++ dbt_project/dbt_project.yml | 6 ++- hooli_data_eng/assets/dbt_assets.py | 49 +++++++++++++--------- hooli_snowflake_insights/Dockerfile | 12 ++++++ hooli_snowflake_insights/definitions.py | 48 +++++++++++++++++++++ hooli_snowflake_insights/requirements.txt | 6 +++ requirements.txt | 4 +- setup.py | 2 + 9 files changed, 127 insertions(+), 22 deletions(-) create mode 100644 hooli_snowflake_insights/Dockerfile create mode 100644 hooli_snowflake_insights/definitions.py create mode 100644 hooli_snowflake_insights/requirements.txt diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index 5fc5a9ab..5ad06ab3 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -118,6 +118,22 @@ jobs: with: command: "ci set-build-output --location-name=batch_enrichment --image-tag=$IMAGE_TAG-batch-enrichment" + # Build 'snowflake_insights' code location + - name: Build and upload Docker image for snowflake insights + if: steps.prerun.outputs.result != 'skip' + uses: docker/build-push-action@v4 + with: + context: ./hooli_snowflake_insights + push: true + tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-snowflake-insights + + - name: Update build session with image tag for snowflake insights + id: ci-set-build-output-snowflake-insights + if: steps.prerun.outputs.result != 'skip' + uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.27 + with: + command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights" + # Deploy - name: Deploy to Dagster Cloud id: ci-deploy diff --git a/dagster_cloud.yaml b/dagster_cloud.yaml index c22def60..a918cf95 100644 --- a/dagster_cloud.yaml +++ b/dagster_cloud.yaml @@ -27,4 +27,10 @@ locations: build: directory: ./hooli_batch_enrichment registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod + - location_name: snowflake_insights + code_source: + python_file: definitions.py + build: + directory: ./hooli_snowflake_insights + registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod diff --git a/dbt_project/dbt_project.yml b/dbt_project/dbt_project.yml index e7393c1d..8c523b56 100644 --- a/dbt_project/dbt_project.yml +++ b/dbt_project/dbt_project.yml @@ -15,4 +15,8 @@ clean-targets: - "dbt_packages" models: - +materialized: table \ No newline at end of file + +materialized: table + +query-comment: + comment: "snowflake_dagster_dbt_v1_opaque_id[[[{{ node.unique_id }}:{{ invocation_id }}]]]" + append: true \ No newline at end of file diff --git a/hooli_data_eng/assets/dbt_assets.py b/hooli_data_eng/assets/dbt_assets.py index 41954d72..8fba7bcf 100644 --- a/hooli_data_eng/assets/dbt_assets.py +++ b/hooli_data_eng/assets/dbt_assets.py @@ -1,26 +1,33 @@ -from typing import Any, Mapping -from dagster._utils import file_relative_path -from dagster_dbt import DbtCliResource, DagsterDbtTranslator -from dagster_dbt import ( - load_assets_from_dbt_project, - default_metadata_from_dbt_resource_props, -) -from dagster_dbt.asset_decorator import dbt_assets +import json +import textwrap +from dateutil import parser +from pathlib import Path +from typing import Any, Generator, Mapping, Union + from dagster import ( AutoMaterializePolicy, AutoMaterializeRule, + AssetCheckResult, AssetKey, + AssetObservation, DailyPartitionsDefinition, WeeklyPartitionsDefinition, OpExecutionContext, Output, - MetadataValue, BackfillPolicy, ) -from dateutil import parser -import json -import textwrap -from pathlib import Path +from dagster_cloud.dagster_insights import dbt_with_snowflake_insights +from dagster_dbt import ( + DbtCliEventMessage, + DbtCliInvocation, + DbtCliResource, + DagsterDbtTranslator, + load_assets_from_dbt_project, + default_metadata_from_dbt_resource_props, +) +from dagster_dbt.asset_decorator import dbt_assets +from dagster._utils import file_relative_path + # many dbt assets use an incremental approach to avoid # re-processing all data on each run @@ -117,11 +124,8 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes dbt_cli_task = dbt2.cli(dbt_args, context=context) - dbt_events = list(dbt_cli_task.stream_raw_events()) - - for event in dbt_events: - # add custom metadata to the asset materialization event - context.log.info(event) + # This function adds model start and end time to derive total execution time + def handle_dbt_event(event: DbtCliEventMessage) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]: for dagster_event in event.to_default_asset_events( manifest=dbt_cli_task.manifest ): @@ -130,7 +134,6 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes started_at = parser.isoparse(event_node_info["node_started_at"]) completed_at = parser.isoparse(event_node_info["node_finished_at"]) - metadata = { "Execution Started At": started_at.isoformat(timespec="seconds"), "Execution Completed At": completed_at.isoformat( @@ -144,8 +147,14 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes metadata=metadata, output_name=dagster_event.output_name, ) - yield dagster_event + + # This function emits an AssetObservation with the dbt model's invocation ID and unique ID (needed for Snowflake Insights) + def handle_all_dbt_events(dbt_cli_task: DbtCliInvocation) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]: + for raw_event in dbt_cli_task.stream_raw_events(): + yield from handle_dbt_event(raw_event) + + yield from dbt_with_snowflake_insights(context, dbt_cli_task, dagster_events=handle_all_dbt_events(dbt_cli_task)) if not dbt_cli_task.is_successful(): raise Exception("dbt command failed, see preceding events") diff --git a/hooli_snowflake_insights/Dockerfile b/hooli_snowflake_insights/Dockerfile new file mode 100644 index 00000000..4ac0381f --- /dev/null +++ b/hooli_snowflake_insights/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.10-slim + +WORKDIR /opt/dagster/app + +RUN apt-get update && apt-get install -y git + +RUN python -m pip uninstall oscrypto -y +RUN python -m pip install git+https://github.com/wbond/oscrypto.git@d5f3437ed24257895ae1edd9e503cfb352e635a8 + +ADD . . + +RUN pip install -r requirements.txt diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py new file mode 100644 index 00000000..b13ae581 --- /dev/null +++ b/hooli_snowflake_insights/definitions.py @@ -0,0 +1,48 @@ +import os + +from dagster import Definitions, EnvVar, ResourceDefinition +from dagster_cloud.dagster_insights import ( + create_snowflake_insights_asset_and_schedule, +) +from dagster_snowflake import SnowflakeResource + +# Used to derive environment (LOCAL, BRANCH, PROD) +def get_env(): + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": + return "BRANCH" + if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": + return "PROD" + return "LOCAL" + +# Setting connection details by environment +resource_def = { + "LOCAL": { + "snowflake_insights": ResourceDefinition.none_resource(), + }, + "BRANCH": { + "snowflake_insights": SnowflakeResource( + account=EnvVar("SNOWFLAKE_ACCOUNT"), + user=EnvVar("SNOWFLAKE_USER"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + ), + }, + "PROD": { + "snowflake_insights": SnowflakeResource( + account=EnvVar("SNOWFLAKE_ACCOUNT"), + user=EnvVar("SNOWFLAKE_USER"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + ), + }, +} + +# Creates an asset (poll_snowflake_query_history_hour) and sets its schedule +snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule( + "2023-10-29-00:00", + snowflake_resource_key="snowflake_insights", +) + +defs = Definitions( + assets=[*snowflake_insights_definitions.assets,], + schedules=[snowflake_insights_definitions.schedule,], + resources=resource_def[get_env()], +) \ No newline at end of file diff --git a/hooli_snowflake_insights/requirements.txt b/hooli_snowflake_insights/requirements.txt new file mode 100644 index 00000000..57b6f6e4 --- /dev/null +++ b/hooli_snowflake_insights/requirements.txt @@ -0,0 +1,6 @@ +dagster +dagster-cloud +dagster-dbt +dagster-snowflake +gql +requests_toolbelt \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index bbc193fd..12f8bd8d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -43,4 +43,6 @@ dbt-duckdb responses plotnine -requests \ No newline at end of file +requests +gql +requests_toolbelt \ No newline at end of file diff --git a/setup.py b/setup.py index 6d0067fe..1c2441be 100644 --- a/setup.py +++ b/setup.py @@ -23,9 +23,11 @@ "dagster-pyspark", "dagster-databricks", "dagstermill", + "gql", "plotnine", "responses", "requests", + "requests_toolbelt", "html5lib", "scikit-learn" ],