From 32c8cd2afb6f7937392e3a57aa7cc45d21a6dfaa Mon Sep 17 00:00:00 2001 From: Charlie Bini <5003326+cbini@users.noreply.github.com> Date: Wed, 4 Dec 2024 18:50:03 +0000 Subject: [PATCH 1/3] feat: list to avro file --- .../kippcamden/deanslist/assets.py | 20 ++++----- .../kippnewark/deanslist/assets.py | 20 ++++----- src/teamster/libraries/deanslist/assets.py | 17 +++---- src/teamster/libraries/deanslist/resources.py | 45 +++++++++++++++++-- tests/assets/test_assets_deanslist.py | 22 ++++++--- 5 files changed, 84 insertions(+), 40 deletions(-) diff --git a/src/teamster/code_locations/kippcamden/deanslist/assets.py b/src/teamster/code_locations/kippcamden/deanslist/assets.py index 133cc5be20..4788ef2cd3 100644 --- a/src/teamster/code_locations/kippcamden/deanslist/assets.py +++ b/src/teamster/code_locations/kippcamden/deanslist/assets.py @@ -90,16 +90,16 @@ api_version="v1", schema=BEHAVIOR_SCHEMA, partitions_def=DEANSLIST_FISCAL_MULTI_PARTITIONS_DEF, - op_tags={ - "dagster-k8s/config": { - "container_config": { - "resources": { - "requests": {"memory": "0.5Gi"}, - "limits": {"memory": "6.0Gi"}, - } - } - } - }, + # op_tags={ + # "dagster-k8s/config": { + # "container_config": { + # "resources": { + # "requests": {"memory": "0.5Gi"}, + # "limits": {"memory": "6.0Gi"}, + # } + # } + # } + # }, ) fiscal_multi_partitions_assets = [behavior, *fiscal_multi_partitions_assets] diff --git a/src/teamster/code_locations/kippnewark/deanslist/assets.py b/src/teamster/code_locations/kippnewark/deanslist/assets.py index 0709e3ede5..9ade320a92 100644 --- a/src/teamster/code_locations/kippnewark/deanslist/assets.py +++ b/src/teamster/code_locations/kippnewark/deanslist/assets.py @@ -90,16 +90,16 @@ api_version="v1", schema=BEHAVIOR_SCHEMA, partitions_def=DEANSLIST_FISCAL_MULTI_PARTITIONS_DEF, - op_tags={ - "dagster-k8s/config": { - "container_config": { - "resources": { - "requests": {"cpu": "250m", "memory": "0.5Gi"}, - "limits": {"cpu": "1250m", "memory": "4.0Gi"}, - } - } - } - }, + # op_tags={ + # "dagster-k8s/config": { + # "container_config": { + # "resources": { + # "requests": {"cpu": "250m", "memory": "0.5Gi"}, + # "limits": {"cpu": "1250m", "memory": "4.0Gi"}, + # } + # } + # } + # }, ) fiscal_multi_partitions_assets = [behavior, *fiscal_multi_partitions_assets] diff --git a/src/teamster/libraries/deanslist/assets.py b/src/teamster/libraries/deanslist/assets.py index fa62c45b5b..f44ba034cb 100644 --- a/src/teamster/libraries/deanslist/assets.py +++ b/src/teamster/libraries/deanslist/assets.py @@ -143,7 +143,7 @@ def build_deanslist_paginated_multi_partition_asset( code_location: str, endpoint: str, api_version: str, - schema, + schema: dict, partitions_def: MultiPartitionsDefinition, op_tags: dict | None = None, params: dict | None = None, @@ -151,17 +151,14 @@ def build_deanslist_paginated_multi_partition_asset( if params is None: params = {} - asset_key = [code_location, "deanslist", "behavior"] - @asset( - key=asset_key, + key=[code_location, "deanslist", "behavior"], metadata=params, - io_manager_key="io_manager_gcs_avro", + io_manager_key="io_manager_gcs_file", partitions_def=partitions_def, op_tags=op_tags, group_name="deanslist", kinds={"python"}, - check_specs=[build_check_spec_avro_schema_valid(asset_key)], ) def _asset(context: AssetExecutionContext, deanslist: DeansListResource): partition_key = _check.inst(obj=context.partition_key, ttype=MultiPartitionKey) @@ -172,9 +169,10 @@ def _asset(context: AssetExecutionContext, deanslist: DeansListResource): date_partition_key_fy = FiscalYear(datetime=date_partition_key, start_month=7) - total_count, data = deanslist.list( + total_count, data_filepath = deanslist.list( api_version=api_version, endpoint=endpoint, + avro_schema=schema, school_id=int(partition_key.keys_by_dimension["school"]), params={ "UpdatedSince": date_partition_key.date().isoformat(), @@ -184,9 +182,6 @@ def _asset(context: AssetExecutionContext, deanslist: DeansListResource): }, ) - yield Output(value=(data, schema), metadata={"records": total_count}) - yield check_avro_schema_valid( - asset_key=context.asset_key, records=data, schema=schema - ) + yield Output(value=data_filepath, metadata={"records": total_count}) return _asset diff --git a/src/teamster/libraries/deanslist/resources.py b/src/teamster/libraries/deanslist/resources.py index 7b26219f95..1f62f7766e 100644 --- a/src/teamster/libraries/deanslist/resources.py +++ b/src/teamster/libraries/deanslist/resources.py @@ -1,3 +1,7 @@ +import pathlib + +import fastavro +import fastavro.types import yaml from dagster import ConfigurableResource, DagsterLogManager, InitResourceContext, _check from pydantic import PrivateAttr @@ -99,16 +103,35 @@ def list( school_id: int, params: dict, page_size: int = 250000, + avro_schema: fastavro.types.Schema | None = None, *args, **kwargs, ): page: int = 1 total_pages: int = 2 total_count: int = 0 - data: list[dict] = [] + all_data: list[dict] = [] + + data_filepath = pathlib.Path( + f"env/deanslist/{endpoint}/{params["UpdatedSince"]}/{school_id}/data.avro" + ).absolute() url = self._get_url(*args, api_version=api_version, endpoint=endpoint) + if avro_schema is not None: + data_filepath.parent.mkdir(parents=True, exist_ok=True) + + with data_filepath.open("wb") as fo: + fastavro.writer( + fo=fo, + schema=avro_schema, + records=[], + codec="snappy", + strict_allow_default=True, + ) + + fo = data_filepath.open("a+b") + while page <= total_pages: params.update({"page_size": page_size, "page": page}) @@ -118,8 +141,24 @@ def list( total_count = response_json["total_count"] total_pages = response_json["total_pages"] - data.extend(response_json["data"]) + data = response_json["data"] + + if avro_schema is not None: + fastavro.writer( + fo=fo, + schema=avro_schema, + records=data, + codec="snappy", + strict_allow_default=True, + ) + else: + all_data.extend(data) page += 1 - return int(total_count), data + fo.close() + + if avro_schema is not None: + return int(total_count), data_filepath + else: + return int(total_count), all_data diff --git a/tests/assets/test_assets_deanslist.py b/tests/assets/test_assets_deanslist.py index 025093d8bb..dcafa8e7d3 100644 --- a/tests/assets/test_assets_deanslist.py +++ b/tests/assets/test_assets_deanslist.py @@ -3,7 +3,11 @@ from dagster import TextMetadataValue, _check, materialize from dagster._core.events import StepMaterializationData -from teamster.core.resources import DEANSLIST_RESOURCE, get_io_manager_gcs_avro +from teamster.core.resources import ( + DEANSLIST_RESOURCE, + get_io_manager_gcs_avro, + get_io_manager_gcs_file, +) def _test_asset(assets, asset_name, partition_key: str | None = None): @@ -21,6 +25,9 @@ def _test_asset(assets, asset_name, partition_key: str | None = None): "io_manager_gcs_avro": get_io_manager_gcs_avro( code_location="test", test=True ), + "io_manager_gcs_file": get_io_manager_gcs_file( + code_location="test", test=True + ), "deanslist": DEANSLIST_RESOURCE, }, ) @@ -38,12 +45,15 @@ def _test_asset(assets, asset_name, partition_key: str | None = None): assert records > 0 - extras = _check.inst( - obj=result.get_asset_check_evaluations()[0].metadata.get("extras"), - ttype=TextMetadataValue, - ) + asset_check_evaluations = result.get_asset_check_evaluations() + + if asset_check_evaluations: + extras = _check.inst( + obj=asset_check_evaluations[0].metadata.get("extras"), + ttype=TextMetadataValue, + ) - assert extras.text == "" + assert extras.text == "" def test_asset_deanslist_lists_kippnewark(): From d9188a9d6f4c9e271d8b34387b5420434e4d65f0 Mon Sep 17 00:00:00 2001 From: Charlie Bini <5003326+cbini@users.noreply.github.com> Date: Wed, 4 Dec 2024 20:31:23 +0000 Subject: [PATCH 2/3] feat: io mgr branch deployment --- .../code_locations/kippcamden/deanslist/assets.py | 10 ---------- .../code_locations/kippnewark/deanslist/assets.py | 10 ---------- src/teamster/core/resources.py | 13 +++++++++++++ 3 files changed, 13 insertions(+), 20 deletions(-) diff --git a/src/teamster/code_locations/kippcamden/deanslist/assets.py b/src/teamster/code_locations/kippcamden/deanslist/assets.py index a88336c230..ada7a49396 100644 --- a/src/teamster/code_locations/kippcamden/deanslist/assets.py +++ b/src/teamster/code_locations/kippcamden/deanslist/assets.py @@ -90,16 +90,6 @@ api_version="v1", schema=BEHAVIOR_SCHEMA, partitions_def=DEANSLIST_FISCAL_MULTI_PARTITIONS_DEF, - # op_tags={ - # "dagster-k8s/config": { - # "container_config": { - # "resources": { - # "requests": {"memory": "0.5Gi"}, - # "limits": {"memory": "6.5Gi"}, - # } - # } - # } - # }, ) fiscal_multi_partitions_assets = [behavior, *fiscal_multi_partitions_assets] diff --git a/src/teamster/code_locations/kippnewark/deanslist/assets.py b/src/teamster/code_locations/kippnewark/deanslist/assets.py index 9ade320a92..91781ebafa 100644 --- a/src/teamster/code_locations/kippnewark/deanslist/assets.py +++ b/src/teamster/code_locations/kippnewark/deanslist/assets.py @@ -90,16 +90,6 @@ api_version="v1", schema=BEHAVIOR_SCHEMA, partitions_def=DEANSLIST_FISCAL_MULTI_PARTITIONS_DEF, - # op_tags={ - # "dagster-k8s/config": { - # "container_config": { - # "resources": { - # "requests": {"cpu": "250m", "memory": "0.5Gi"}, - # "limits": {"cpu": "1250m", "memory": "4.0Gi"}, - # } - # } - # } - # }, ) fiscal_multi_partitions_assets = [behavior, *fiscal_multi_partitions_assets] diff --git a/src/teamster/core/resources.py b/src/teamster/core/resources.py index 2d7b14de7b..3172dbff39 100644 --- a/src/teamster/core/resources.py +++ b/src/teamster/core/resources.py @@ -1,3 +1,5 @@ +import os + from dagster import EnvVar from dagster_dbt import DbtCliResource from dagster_gcp import BigQueryResource, GCSResource @@ -71,12 +73,19 @@ def get_io_manager_gcs_pickle(code_location): + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1": + code_location = "test" + return GCSIOManager( gcs=GCS_RESOURCE, gcs_bucket=f"teamster-{code_location}", object_type="pickle" ) def get_io_manager_gcs_avro(code_location, test=False): + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1": + code_location = "test" + test = True + return GCSIOManager( gcs=GCS_RESOURCE, gcs_bucket=f"teamster-{code_location}", @@ -86,6 +95,10 @@ def get_io_manager_gcs_avro(code_location, test=False): def get_io_manager_gcs_file(code_location, test=False): + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1": + code_location = "test" + test = True + return GCSIOManager( gcs=GCS_RESOURCE, gcs_bucket=f"teamster-{code_location}", From eab8d4224bafc3788a7e2a1ec616100f534dd0f1 Mon Sep 17 00:00:00 2001 From: Charlie Bini <5003326+cbini@users.noreply.github.com> Date: Wed, 4 Dec 2024 21:20:33 +0000 Subject: [PATCH 3/3] feat: add incidents_attachments, dbt dev schema --- .../staging/stg_deanslist__incidents.sql | 1 + .../stg_deanslist__incidents__attachments.sql | 32 +++++++++++++++++++ .../models/amplify/dds/sources-external.yml | 7 ++-- .../models/amplify/dibels/sources-drive.yml | 7 ++-- .../amplify/mclass/sources-external.yml | 7 ++-- .../models/deanslist/sources-kippcamden.yml | 12 +++++++ .../models/deanslist/sources-kippmiami.yml | 12 +++++++ .../models/deanslist/sources-kippnewark.yml | 12 +++++++ .../stg_deanslist__incidents__attachments.sql | 9 ++++++ .../models/iready/sources-kippmiami.yml | 7 ++-- .../kipptaf/models/iready/sources-kippnj.yml | 7 ++-- .../stg_iready__instruction_by_lesson_pro.sql | 4 +-- 12 files changed, 100 insertions(+), 17 deletions(-) create mode 100644 src/dbt/deanslist/models/staging/stg_deanslist__incidents__attachments.sql create mode 100644 src/dbt/kipptaf/models/deanslist/staging/stg_deanslist__incidents__attachments.sql diff --git a/src/dbt/deanslist/models/staging/stg_deanslist__incidents.sql b/src/dbt/deanslist/models/staging/stg_deanslist__incidents.sql index 9922c8692e..5213bcae4f 100644 --- a/src/dbt/deanslist/models/staging/stg_deanslist__incidents.sql +++ b/src/dbt/deanslist/models/staging/stg_deanslist__incidents.sql @@ -29,6 +29,7 @@ select actions, custom_fields, penalties, + attachments, /* transformations */ nullif(addlreqs, '') as addl_reqs, diff --git a/src/dbt/deanslist/models/staging/stg_deanslist__incidents__attachments.sql b/src/dbt/deanslist/models/staging/stg_deanslist__incidents__attachments.sql new file mode 100644 index 0000000000..9927eb18fa --- /dev/null +++ b/src/dbt/deanslist/models/staging/stg_deanslist__incidents__attachments.sql @@ -0,0 +1,32 @@ +select + i.incident_id, + + a.attachmenttype as attachment_type, + a.contenttype as content_type, + a.entityname as entity_name, + a.entitytype as entity_type, + a.internalfilename as internal_filename, + a.internalfolder as internal_folder, + a.minuserlevel as min_user_level, + a.minuserlevelgroupname as min_user_level_group_name, + a.publicfilename as public_filename, + a.reporttype as report_type, + a.sourcetype as source_type, + a.url, + + a.filepostedat.timezone as file_posted_at__timezone, + a.filepostedat.timezone_type as file_posted_at__timezone_type, + + cast(a.attachmentid as int) as attachment_id, + cast(a.bytes as int) as bytes, + cast(a.entityid as int) as entity_id, + cast(a.schoolid as int) as school_id, + cast(a.sourceid as int) as source_id, + cast(a.studentid as int) as student_id, + cast(a.termid as int) as term_id, + + cast(a.reportdate as date) as report_date, + + cast(a.filepostedat.`date` as datetime) as file_posted_at__date, +from {{ ref("stg_deanslist__incidents") }} as i +cross join unnest(i.attachments) as a diff --git a/src/dbt/kipptaf/models/amplify/dds/sources-external.yml b/src/dbt/kipptaf/models/amplify/dds/sources-external.yml index 19b75880c0..42e877118d 100644 --- a/src/dbt/kipptaf/models/amplify/dds/sources-external.yml +++ b/src/dbt/kipptaf/models/amplify/dds/sources-external.yml @@ -2,9 +2,10 @@ version: 2 sources: - name: amplify - schema: - "{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{% - endif %}" + schema: | + {% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify + {%- else -%}kipptaf_amplify + {%- endif %} tags: - stage_external_sources tables: diff --git a/src/dbt/kipptaf/models/amplify/dibels/sources-drive.yml b/src/dbt/kipptaf/models/amplify/dibels/sources-drive.yml index 05854169e4..75c6be0127 100644 --- a/src/dbt/kipptaf/models/amplify/dibels/sources-drive.yml +++ b/src/dbt/kipptaf/models/amplify/dibels/sources-drive.yml @@ -2,9 +2,10 @@ version: 2 sources: - name: amplify - schema: - "{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{% - endif %}" + schema: | + {% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify + {%- else -%}kipptaf_amplify + {%- endif %} tags: - stage_external_sources tables: diff --git a/src/dbt/kipptaf/models/amplify/mclass/sources-external.yml b/src/dbt/kipptaf/models/amplify/mclass/sources-external.yml index 29b55fd512..6075da7f1f 100644 --- a/src/dbt/kipptaf/models/amplify/mclass/sources-external.yml +++ b/src/dbt/kipptaf/models/amplify/mclass/sources-external.yml @@ -2,9 +2,10 @@ version: 2 sources: - name: amplify - schema: - "{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{% - endif %}" + schema: | + {% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify + {%- else -%}kipptaf_amplify + {%- endif %} tags: - stage_external_sources tables: diff --git a/src/dbt/kipptaf/models/deanslist/sources-kippcamden.yml b/src/dbt/kipptaf/models/deanslist/sources-kippcamden.yml index 54d3dfc14f..0cea91bbe9 100644 --- a/src/dbt/kipptaf/models/deanslist/sources-kippcamden.yml +++ b/src/dbt/kipptaf/models/deanslist/sources-kippcamden.yml @@ -2,6 +2,10 @@ version: 2 sources: - name: kippcamden_deanslist + schema: | + {% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippcamden_deanslist + {%- else -%}kippcamden_deanslist + {%- endif %} tables: - name: stg_deanslist__behavior meta: @@ -139,3 +143,11 @@ sources: - kippcamden - deanslist - stg_deanslist__dff_stats + - name: stg_deanslist__incidents__attachments + meta: + dagster: + group: deanslist + asset_key: + - kippcamden + - deanslist + - stg_deanslist__incidents__attachments diff --git a/src/dbt/kipptaf/models/deanslist/sources-kippmiami.yml b/src/dbt/kipptaf/models/deanslist/sources-kippmiami.yml index 1c747feba4..cd570b28d1 100644 --- a/src/dbt/kipptaf/models/deanslist/sources-kippmiami.yml +++ b/src/dbt/kipptaf/models/deanslist/sources-kippmiami.yml @@ -2,6 +2,10 @@ version: 2 sources: - name: kippmiami_deanslist + schema: | + {% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippmiami_deanslist + {%- else -%}kippmiami_deanslist + {%- endif %} tables: - name: stg_deanslist__behavior meta: @@ -139,3 +143,11 @@ sources: - kippmiami - deanslist - stg_deanslist__dff_stats + - name: stg_deanslist__incidents__attachments + meta: + dagster: + group: deanslist + asset_key: + - kippmiami + - deanslist + - stg_deanslist__incidents__attachments diff --git a/src/dbt/kipptaf/models/deanslist/sources-kippnewark.yml b/src/dbt/kipptaf/models/deanslist/sources-kippnewark.yml index 6860a2ca1b..1daa4a167c 100644 --- a/src/dbt/kipptaf/models/deanslist/sources-kippnewark.yml +++ b/src/dbt/kipptaf/models/deanslist/sources-kippnewark.yml @@ -2,6 +2,10 @@ version: 2 sources: - name: kippnewark_deanslist + schema: | + {% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippnewark_deanslist + {%- else -%}kippnewark_deanslist + {%- endif %} tables: - name: stg_deanslist__behavior meta: @@ -139,3 +143,11 @@ sources: - kippnewark - deanslist - stg_deanslist__dff_stats + - name: stg_deanslist__incidents__attachments + meta: + dagster: + group: deanslist + asset_key: + - kippnewark + - deanslist + - stg_deanslist__incidents__attachments diff --git a/src/dbt/kipptaf/models/deanslist/staging/stg_deanslist__incidents__attachments.sql b/src/dbt/kipptaf/models/deanslist/staging/stg_deanslist__incidents__attachments.sql new file mode 100644 index 0000000000..73af51d0be --- /dev/null +++ b/src/dbt/kipptaf/models/deanslist/staging/stg_deanslist__incidents__attachments.sql @@ -0,0 +1,9 @@ +{{ + dbt_utils.union_relations( + relations=[ + source("kippnewark_deanslist", "stg_deanslist__incidents__attachments"), + source("kippcamden_deanslist", "stg_deanslist__incidents__attachments"), + source("kippmiami_deanslist", "stg_deanslist__incidents__attachments"), + ] + ) +}} diff --git a/src/dbt/kipptaf/models/iready/sources-kippmiami.yml b/src/dbt/kipptaf/models/iready/sources-kippmiami.yml index 57e915974b..861ed611a1 100644 --- a/src/dbt/kipptaf/models/iready/sources-kippmiami.yml +++ b/src/dbt/kipptaf/models/iready/sources-kippmiami.yml @@ -2,9 +2,10 @@ version: 2 sources: - name: kippmiami_iready - schema: - "{% if env_var('DBT_DEV', '') == 'true' %}_dev_kippmiami_iready{% else %}kippmiami_iready{% - endif %}" + schema: | + {% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippmiami_iready + {%- else -%}kippmiami_iready + {%- endif %} tables: - name: stg_iready__diagnostic_results meta: diff --git a/src/dbt/kipptaf/models/iready/sources-kippnj.yml b/src/dbt/kipptaf/models/iready/sources-kippnj.yml index 2302d542c9..4815246f45 100644 --- a/src/dbt/kipptaf/models/iready/sources-kippnj.yml +++ b/src/dbt/kipptaf/models/iready/sources-kippnj.yml @@ -2,9 +2,10 @@ version: 2 sources: - name: kippnj_iready - schema: - "{% if env_var('DBT_DEV', '') == 'true' %}_dev_kippnewark_iready{% else %}kippnewark_iready{% - endif %}" + schema: | + {% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippnewark_iready + {%- else -%}kippnewark_iready + {%- endif %} tables: - name: stg_iready__diagnostic_results meta: diff --git a/src/dbt/kipptaf/models/iready/staging/stg_iready__instruction_by_lesson_pro.sql b/src/dbt/kipptaf/models/iready/staging/stg_iready__instruction_by_lesson_pro.sql index e6a879643b..f9cc98bfaa 100644 --- a/src/dbt/kipptaf/models/iready/staging/stg_iready__instruction_by_lesson_pro.sql +++ b/src/dbt/kipptaf/models/iready/staging/stg_iready__instruction_by_lesson_pro.sql @@ -1,8 +1,8 @@ {{ dbt_utils.union_relations( relations=[ - source("kippnj_iready", model.name), - source("kippmiami_iready", model.name), + source("kippnj_iready", "stg_iready__instruction_by_lesson_pro"), + source("kippmiami_iready", "stg_iready__instruction_by_lesson_pro"), ] ) }}