-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update sling assets #92
Merged
Merged
Changes from 4 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
f550709
updated sling assets to use the sling_assets decorator
izzye84 fd6c25f
updated the definitions to use my_sling_assets
izzye84 9905e28
modified the sling resource to use the updated APIs.
izzye84 50569fd
ignore duckdb files
izzye84 3024396
Update hooli-demo-assets/hooli_demo_assets/resources/__init__.py
izzye84 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -172,5 +172,5 @@ cython_debug/ | |
tmp*/ | ||
|
||
# dbt | ||
dbt_project/example.duckdb | ||
dbt_project/example.duckdb* | ||
dbt_project/logs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,25 @@ | ||
from dagster import AssetSpec | ||
from dagster_embedded_elt.sling import build_sling_asset, SlingMode | ||
|
||
|
||
raw_location = build_sling_asset( | ||
asset_spec=AssetSpec(key=["locations"]), | ||
source_stream="s3://hooli-demo/embedded-elt/", | ||
target_object= "RAW_DATA.LOCATIONS", | ||
mode=SlingMode.FULL_REFRESH, | ||
source_options={"format": "csv"}, | ||
sling_resource_key="s3_to_snowflake_resource" | ||
) | ||
from dagster_embedded_elt.sling import ( | ||
sling_assets, | ||
SlingResource, | ||
) | ||
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator | ||
|
||
from hooli_demo_assets.resources import replication_config | ||
|
||
|
||
class CustomSlingTranslator(DagsterSlingTranslator): | ||
def __init__(self, target_prefix="RAW_DATA"): | ||
super().__init__(target_prefix=target_prefix) | ||
|
||
def get_group_name(self, stream_definition): | ||
return "RAW_DATA" | ||
|
||
|
||
@sling_assets( | ||
replication_config=replication_config, | ||
dagster_sling_translator=CustomSlingTranslator(), | ||
) | ||
def my_sling_assets(context, sling: SlingResource): | ||
yield from sling.replicate(context=context) | ||
for row in sling.stream_raw_logs(): | ||
context.log.info(row) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,18 @@ | ||
from dagster import ( | ||
Definitions, | ||
load_assets_from_modules, | ||
|
||
Definitions, | ||
) | ||
|
||
from hooli_demo_assets.assets import sling | ||
from hooli_demo_assets.assets.sling import my_sling_assets | ||
from hooli_demo_assets.jobs import daily_sling_job | ||
from hooli_demo_assets.resources import get_env, resource_def | ||
from hooli_demo_assets.schedules import daily_sling_assets | ||
from hooli_demo_assets.resources import sling_resource | ||
from hooli_demo_assets.schedules import daily_sling_assets | ||
|
||
|
||
sling_assets = load_assets_from_modules([sling], key_prefix="RAW_DATA", group_name="RAW_DATA") | ||
|
||
defs = Definitions( | ||
assets=[*sling_assets], | ||
schedules=[daily_sling_assets], | ||
jobs=[daily_sling_job], | ||
resources=resource_def[get_env()], | ||
) | ||
assets=[my_sling_assets], | ||
schedules=[daily_sling_assets], | ||
jobs=[daily_sling_job], | ||
resources={ | ||
"sling": sling_resource | ||
}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,10 @@ | ||
from dagster import AssetSelection, define_asset_job | ||
|
||
|
||
raw_location_by_day = AssetSelection.keys(["RAW_DATA", "locations"]) | ||
|
||
|
||
daily_sling_job = define_asset_job( | ||
name="daily_sling_job", | ||
selection=raw_location_by_day, | ||
) | ||
name="daily_sling_job", | ||
selection=raw_location_by_day, | ||
) |
139 changes: 86 additions & 53 deletions
139
hooli-demo-assets/hooli_demo_assets/resources/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,66 +1,99 @@ | ||
import os | ||
from pathlib import Path | ||
|
||
from dagster import EnvVar | ||
from pathlib import Path | ||
from dagster_embedded_elt.sling import ( | ||
SlingResource, | ||
SlingSourceConnection, | ||
SlingTargetConnection, | ||
SlingResource, | ||
SlingConnectionResource, | ||
) | ||
|
||
|
||
def get_env(): | ||
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": | ||
return "BRANCH" | ||
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": | ||
return "PROD" | ||
return "LOCAL" | ||
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": | ||
return "BRANCH" | ||
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": | ||
return "PROD" | ||
return "LOCAL" | ||
|
||
|
||
# embedded elt source and target | ||
# Path for duckdb connection - needed for local dev | ||
current_file_path = Path(__file__) | ||
parent_dir_path = current_file_path.parent.parent.parent.parent | ||
DUCKDB_PATH = parent_dir_path / "dbt_project" / "example.duckdb" | ||
|
||
source = SlingSourceConnection( | ||
type="s3", | ||
bucket=EnvVar("AWS_S3_BUCKET"), | ||
region=EnvVar("AWS_REGION"), | ||
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), | ||
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), | ||
) | ||
local_target = SlingTargetConnection( | ||
type="duckdb", | ||
instance=f"{DUCKDB_PATH}", | ||
) | ||
branch_target = SlingTargetConnection( | ||
type="snowflake", | ||
host=EnvVar("SNOWFLAKE_HOST"), | ||
database="DEMO_DB2_BRANCH", | ||
user=EnvVar("SNOWFLAKE_USER"), | ||
password=EnvVar("SNOWFLAKE_PASSWORD"), | ||
role=EnvVar("SNOWFLAKE_ROLE"), | ||
) | ||
prod_target = SlingTargetConnection( | ||
type="snowflake", | ||
host=EnvVar("SNOWFLAKE_HOST"), | ||
database="DEMO_DB2", | ||
user=EnvVar("SNOWFLAKE_USER"), | ||
password=EnvVar("SNOWFLAKE_PASSWORD"), | ||
role=EnvVar("SNOWFLAKE_ROLE"), | ||
) | ||
|
||
resource_def = { | ||
"LOCAL": { | ||
"s3_to_snowflake_resource": | ||
SlingResource(source_connection=source, target_connection=local_target), | ||
}, | ||
"BRANCH": { | ||
"s3_to_snowflake_resource": SlingResource( | ||
source_connection=source, target_connection=branch_target | ||
), | ||
}, | ||
"PROD": { | ||
"s3_to_snowflake_resource": SlingResource( | ||
source_connection=source, target_connection=prod_target | ||
), | ||
}, | ||
} | ||
# Alternatively a replication.yaml file can be used | ||
def create_replication_config(env: str): | ||
# Determine the target database dynamically based on the environment | ||
target_database = { | ||
'LOCAL': 'DUCKDB', | ||
'BRANCH': 'SNOWFLAKE_BRANCH', | ||
'PROD': 'SNOWFLAKE_PROD' | ||
}.get(env, 'DUCKDB') # Default to DUCKDB if no environment matches | ||
|
||
replication_config = { | ||
"source": "S3", | ||
"target": target_database, | ||
"defaults": { | ||
"mode": "full-refresh", | ||
"object": "{stream_file_folder}.{stream_file_name}", | ||
"source_options": { | ||
"format": "csv" | ||
} | ||
}, | ||
"streams": { | ||
"s3://hooli-demo/embedded-elt/locations.csv": { | ||
"object": "locations" | ||
} | ||
} | ||
} | ||
return replication_config | ||
|
||
|
||
def create_sling_resource(env: str): | ||
# Dynamically generate connection based on enviornment | ||
connections = [ | ||
SlingConnectionResource( | ||
name="S3", | ||
type="s3", | ||
bucket=EnvVar("AWS_S3_BUCKET"), | ||
region=EnvVar("AWS_REGION"), | ||
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), | ||
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), | ||
) | ||
] | ||
if env == 'LOCAL': | ||
connections.append(SlingConnectionResource( | ||
name="DUCKDB", | ||
type="duckdb", | ||
instance=f"{DUCKDB_PATH}", | ||
izzye84 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
schema="RAW_DATA", | ||
)) | ||
elif env == 'BRANCH': | ||
connections.append(SlingConnectionResource( | ||
name="SNOWFLAKE_BRANCH", | ||
type="snowflake", | ||
host=EnvVar("SNOWFLAKE_HOST"), | ||
user=EnvVar("SNOWFLAKE_USER"), | ||
password=EnvVar("SNOWFLAKE_PASSWORD"), | ||
role=EnvVar("SNOWFLAKE_ROLE"), | ||
database="DEMO_DB2_BRANCH", | ||
schema="RAW_DATA", | ||
)) | ||
elif env == 'PROD': | ||
connections.append(SlingConnectionResource( | ||
name="SNOWFLAKE_PROD", | ||
type="snowflake", | ||
host=EnvVar("SNOWFLAKE_HOST"), | ||
user=EnvVar("SNOWFLAKE_USER"), | ||
password=EnvVar("SNOWFLAKE_PASSWORD"), | ||
role=EnvVar("SNOWFLAKE_ROLE"), | ||
database="DEMO_DB2", | ||
schema="RAW_DATA", | ||
)) | ||
return SlingResource(connections=connections) | ||
|
||
|
||
env = get_env() | ||
replication_config = create_replication_config(env) | ||
sling_resource = create_sling_resource(env) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
from dagster import ScheduleDefinition | ||
from hooli_demo_assets.jobs import daily_sling_job | ||
|
||
|
||
daily_sling_assets = ScheduleDefinition( | ||
job=daily_sling_job, | ||
cron_schedule="0 0 * * *", # every day at midnight | ||
) | ||
job=daily_sling_job, | ||
cron_schedule="0 0 * * *", # every day at midnight | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commenting out the schema will allow this to run locally—this is the bug I mentioned in the PR description.