Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramshackle-Jamathon committed Sep 22, 2023
1 parent 412a4dc commit 18756f9
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 59 deletions.
8 changes: 0 additions & 8 deletions purina_usage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dagster import (
Definitions,
EnvVar,
ScheduleDefinition,
define_asset_job,
fs_io_manager,
Expand All @@ -9,7 +8,6 @@
from dagster_snowflake_pandas import snowflake_pandas_io_manager

from purina_usage.assets import raw_data, usage, dbt_snowflake
from dagster_insights import DagsterInsightsResource

# import os
# from pathlib import Path
Expand Down Expand Up @@ -51,7 +49,6 @@
raw_job = define_asset_job("raw_job", selection=["raw_data/users", "raw_data/orders"])

from gql.transport.requests import RequestsHTTPTransport
from dagster_insights import DagsterInsightsResource

transport = RequestsHTTPTransport(
url="http://localhost:3000/test/staging/graphql",
Expand All @@ -76,11 +73,6 @@
"model_io_manager": fs_io_manager,
# this resource is used to execute dbt cli commands
"dbt": dbt_snowflake.dbt_cli_resource,
"dagster_insights": DagsterInsightsResource(
organization_id=EnvVar("DAGSTER_ORGANIZATION_ID"),
api_token=EnvVar("DAGSTER_API_TOKEN"),
url="http://localhost:3000/test/staging/graphql",
),
}


Expand Down
36 changes: 34 additions & 2 deletions purina_usage/assets/dbt_snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,34 @@
from .blocking import blocking_dbt_snowflake_assets as blocking_dbt_snowflake_assets
from .nonblocking import nonblocking_dbt_snowflake_assets as nonblocking_dbt_snowflake_assets
import os
from pathlib import Path

from dagster_dbt import DbtCliResource, dbt_assets

from dagster import OpExecutionContext
from dagster_cloud.metrics import SnowflakeConnectionDetails, store_dbt_adapter_metrics

snowflake_connection_details = SnowflakeConnectionDetails(
user=os.getenv("SNOWFLAKE_USER", ""),
password=os.getenv("SNOWFLAKE_PASSWORD", ""),
account="na94824.us-east-1",
warehouse="DEVELOPMENT",
)

dbt_project_dir = Path(__file__).joinpath("..", "..", "..", "..", "dbt_project").resolve()
dbt_cli_resource = DbtCliResource(
project_dir=os.fspath(dbt_project_dir),
profiles_dir=os.fspath(dbt_project_dir.joinpath("config")),
target="staging",
)

dbt_parse_invocation = dbt_cli_resource.cli(["parse"]).wait()
dbt_manifest_path = dbt_parse_invocation.target_path.joinpath("manifest.json")


@dbt_assets(manifest=dbt_manifest_path)
def nonblocking_dbt_snowflake_assets(context: OpExecutionContext, dbt: DbtCliResource):
dbt_cli_invocation = dbt.cli(["build"], context=context)
yield from dbt_cli_invocation.stream()

run_results = dbt_cli_invocation.get_artifact("run_results.json")
manifest = dbt_cli_invocation.get_artifact("manifest.json")
store_dbt_adapter_metrics(context, manifest, run_results, snowflake_connection_details)
38 changes: 0 additions & 38 deletions purina_usage/assets/dbt_snowflake/blocking.py

This file was deleted.

Empty file.
18 changes: 7 additions & 11 deletions purina_usage/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
from dagster import asset
from typing import Generator, Any
from dagster import OpExecutionContext, Output
from dagster_cloud.metrics import put_context_metrics, DagsterInsightsMetric

from purina_usage.utils import random_data
from dagster_insights import DagsterInsightsResource, DagsterInsightsMetric


@asset(compute_kind="random")
def users(
context: OpExecutionContext, dagster_insights: DagsterInsightsResource
) -> Generator[Output[pd.DataFrame], Any, Any]:
def users(context: OpExecutionContext) -> Generator[Output[pd.DataFrame], Any, Any]:
"""A table containing all users data."""
dagster_insights.put_context_metrics(
put_context_metrics(
context,
metrics=[
DagsterInsightsMetric(
Expand All @@ -32,7 +30,7 @@ def users(
}
)
yield Output(data)
dagster_insights.put_context_metrics(
put_context_metrics(
context,
metrics=[
DagsterInsightsMetric(
Expand All @@ -44,12 +42,10 @@ def users(


@asset(compute_kind="random")
def orders(
context: OpExecutionContext, dagster_insights: DagsterInsightsResource
) -> Generator[Output[pd.DataFrame], Any, Any]:
def orders(context: OpExecutionContext) -> Generator[Output[pd.DataFrame], Any, Any]:
"""A table containing all orders that have been placed."""

dagster_insights.put_context_metrics(
put_context_metrics(
context,
metrics=[
DagsterInsightsMetric(
Expand All @@ -63,7 +59,7 @@ def orders(
n=10000,
)
yield Output(data)
dagster_insights.put_context_metrics(
put_context_metrics(
context,
metrics=[
DagsterInsightsMetric(
Expand Down

0 comments on commit 18756f9

Please sign in to comment.