Skip to content

Commit

Permalink
Merge pull request #2263 from TEAMSchools/2233-warehouse-new-deanslis…
Browse files Browse the repository at this point in the history
…t-incident-api-fields-re-attached-documents

feat: list to avro file
  • Loading branch information
cbini authored Dec 4, 2024
2 parents 74f4494 + eab8d42 commit f18cc0f
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ select
actions,
custom_fields,
penalties,
attachments,

/* transformations */
nullif(addlreqs, '') as addl_reqs,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
select
i.incident_id,

a.attachmenttype as attachment_type,
a.contenttype as content_type,
a.entityname as entity_name,
a.entitytype as entity_type,
a.internalfilename as internal_filename,
a.internalfolder as internal_folder,
a.minuserlevel as min_user_level,
a.minuserlevelgroupname as min_user_level_group_name,
a.publicfilename as public_filename,
a.reporttype as report_type,
a.sourcetype as source_type,
a.url,

a.filepostedat.timezone as file_posted_at__timezone,
a.filepostedat.timezone_type as file_posted_at__timezone_type,

cast(a.attachmentid as int) as attachment_id,
cast(a.bytes as int) as bytes,
cast(a.entityid as int) as entity_id,
cast(a.schoolid as int) as school_id,
cast(a.sourceid as int) as source_id,
cast(a.studentid as int) as student_id,
cast(a.termid as int) as term_id,

cast(a.reportdate as date) as report_date,

cast(a.filepostedat.`date` as datetime) as file_posted_at__date,
from {{ ref("stg_deanslist__incidents") }} as i
cross join unnest(i.attachments) as a
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/amplify/dds/sources-external.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: amplify
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify
{%- else -%}kipptaf_amplify
{%- endif %}
tags:
- stage_external_sources
tables:
Expand Down
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/amplify/dibels/sources-drive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: amplify
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify
{%- else -%}kipptaf_amplify
{%- endif %}
tags:
- stage_external_sources
tables:
Expand Down
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/amplify/mclass/sources-external.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: amplify
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify
{%- else -%}kipptaf_amplify
{%- endif %}
tags:
- stage_external_sources
tables:
Expand Down
12 changes: 12 additions & 0 deletions src/dbt/kipptaf/models/deanslist/sources-kippcamden.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ version: 2

sources:
- name: kippcamden_deanslist
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippcamden_deanslist
{%- else -%}kippcamden_deanslist
{%- endif %}
tables:
- name: stg_deanslist__behavior
meta:
Expand Down Expand Up @@ -139,3 +143,11 @@ sources:
- kippcamden
- deanslist
- stg_deanslist__dff_stats
- name: stg_deanslist__incidents__attachments
meta:
dagster:
group: deanslist
asset_key:
- kippcamden
- deanslist
- stg_deanslist__incidents__attachments
12 changes: 12 additions & 0 deletions src/dbt/kipptaf/models/deanslist/sources-kippmiami.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ version: 2

sources:
- name: kippmiami_deanslist
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippmiami_deanslist
{%- else -%}kippmiami_deanslist
{%- endif %}
tables:
- name: stg_deanslist__behavior
meta:
Expand Down Expand Up @@ -139,3 +143,11 @@ sources:
- kippmiami
- deanslist
- stg_deanslist__dff_stats
- name: stg_deanslist__incidents__attachments
meta:
dagster:
group: deanslist
asset_key:
- kippmiami
- deanslist
- stg_deanslist__incidents__attachments
12 changes: 12 additions & 0 deletions src/dbt/kipptaf/models/deanslist/sources-kippnewark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ version: 2

sources:
- name: kippnewark_deanslist
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippnewark_deanslist
{%- else -%}kippnewark_deanslist
{%- endif %}
tables:
- name: stg_deanslist__behavior
meta:
Expand Down Expand Up @@ -139,3 +143,11 @@ sources:
- kippnewark
- deanslist
- stg_deanslist__dff_stats
- name: stg_deanslist__incidents__attachments
meta:
dagster:
group: deanslist
asset_key:
- kippnewark
- deanslist
- stg_deanslist__incidents__attachments
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{
dbt_utils.union_relations(
relations=[
source("kippnewark_deanslist", "stg_deanslist__incidents__attachments"),
source("kippcamden_deanslist", "stg_deanslist__incidents__attachments"),
source("kippmiami_deanslist", "stg_deanslist__incidents__attachments"),
]
)
}}
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/iready/sources-kippmiami.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: kippmiami_iready
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kippmiami_iready{% else %}kippmiami_iready{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippmiami_iready
{%- else -%}kippmiami_iready
{%- endif %}
tables:
- name: stg_iready__diagnostic_results
meta:
Expand Down
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/iready/sources-kippnj.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: kippnj_iready
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kippnewark_iready{% else %}kippnewark_iready{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippnewark_iready
{%- else -%}kippnewark_iready
{%- endif %}
tables:
- name: stg_iready__diagnostic_results
meta:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{{
dbt_utils.union_relations(
relations=[
source("kippnj_iready", model.name),
source("kippmiami_iready", model.name),
source("kippnj_iready", "stg_iready__instruction_by_lesson_pro"),
source("kippmiami_iready", "stg_iready__instruction_by_lesson_pro"),
]
)
}}
10 changes: 0 additions & 10 deletions src/teamster/code_locations/kippcamden/deanslist/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@
api_version="v1",
schema=BEHAVIOR_SCHEMA,
partitions_def=DEANSLIST_FISCAL_MULTI_PARTITIONS_DEF,
op_tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"memory": "0.5Gi"},
"limits": {"memory": "6.5Gi"},
}
}
}
},
)

fiscal_multi_partitions_assets = [behavior, *fiscal_multi_partitions_assets]
Expand Down
10 changes: 0 additions & 10 deletions src/teamster/code_locations/kippnewark/deanslist/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@
api_version="v1",
schema=BEHAVIOR_SCHEMA,
partitions_def=DEANSLIST_FISCAL_MULTI_PARTITIONS_DEF,
op_tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "250m", "memory": "0.5Gi"},
"limits": {"cpu": "1250m", "memory": "4.0Gi"},
}
}
}
},
)

fiscal_multi_partitions_assets = [behavior, *fiscal_multi_partitions_assets]
Expand Down
13 changes: 13 additions & 0 deletions src/teamster/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from dagster import EnvVar
from dagster_dbt import DbtCliResource
from dagster_gcp import BigQueryResource, GCSResource
Expand Down Expand Up @@ -71,12 +73,19 @@


def get_io_manager_gcs_pickle(code_location):
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1":
code_location = "test"

return GCSIOManager(
gcs=GCS_RESOURCE, gcs_bucket=f"teamster-{code_location}", object_type="pickle"
)


def get_io_manager_gcs_avro(code_location, test=False):
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1":
code_location = "test"
test = True

return GCSIOManager(
gcs=GCS_RESOURCE,
gcs_bucket=f"teamster-{code_location}",
Expand All @@ -86,6 +95,10 @@ def get_io_manager_gcs_avro(code_location, test=False):


def get_io_manager_gcs_file(code_location, test=False):
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1":
code_location = "test"
test = True

return GCSIOManager(
gcs=GCS_RESOURCE,
gcs_bucket=f"teamster-{code_location}",
Expand Down
17 changes: 6 additions & 11 deletions src/teamster/libraries/deanslist/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,22 @@ def build_deanslist_paginated_multi_partition_asset(
code_location: str,
endpoint: str,
api_version: str,
schema,
schema: dict,
partitions_def: MultiPartitionsDefinition,
op_tags: dict | None = None,
params: dict | None = None,
) -> AssetsDefinition:
if params is None:
params = {}

asset_key = [code_location, "deanslist", "behavior"]

@asset(
key=asset_key,
key=[code_location, "deanslist", "behavior"],
metadata=params,
io_manager_key="io_manager_gcs_avro",
io_manager_key="io_manager_gcs_file",
partitions_def=partitions_def,
op_tags=op_tags,
group_name="deanslist",
kinds={"python"},
check_specs=[build_check_spec_avro_schema_valid(asset_key)],
)
def _asset(context: AssetExecutionContext, deanslist: DeansListResource):
partition_key = _check.inst(obj=context.partition_key, ttype=MultiPartitionKey)
Expand All @@ -172,9 +169,10 @@ def _asset(context: AssetExecutionContext, deanslist: DeansListResource):

date_partition_key_fy = FiscalYear(datetime=date_partition_key, start_month=7)

total_count, data = deanslist.list(
total_count, data_filepath = deanslist.list(
api_version=api_version,
endpoint=endpoint,
avro_schema=schema,
school_id=int(partition_key.keys_by_dimension["school"]),
params={
"UpdatedSince": date_partition_key.date().isoformat(),
Expand All @@ -184,9 +182,6 @@ def _asset(context: AssetExecutionContext, deanslist: DeansListResource):
},
)

yield Output(value=(data, schema), metadata={"records": total_count})
yield check_avro_schema_valid(
asset_key=context.asset_key, records=data, schema=schema
)
yield Output(value=data_filepath, metadata={"records": total_count})

return _asset
45 changes: 42 additions & 3 deletions src/teamster/libraries/deanslist/resources.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import pathlib

import fastavro
import fastavro.types
import yaml
from dagster import ConfigurableResource, DagsterLogManager, InitResourceContext, _check
from pydantic import PrivateAttr
Expand Down Expand Up @@ -99,16 +103,35 @@ def list(
school_id: int,
params: dict,
page_size: int = 250000,
avro_schema: fastavro.types.Schema | None = None,
*args,
**kwargs,
):
page: int = 1
total_pages: int = 2
total_count: int = 0
data: list[dict] = []
all_data: list[dict] = []

data_filepath = pathlib.Path(
f"env/deanslist/{endpoint}/{params["UpdatedSince"]}/{school_id}/data.avro"
).absolute()

url = self._get_url(*args, api_version=api_version, endpoint=endpoint)

if avro_schema is not None:
data_filepath.parent.mkdir(parents=True, exist_ok=True)

with data_filepath.open("wb") as fo:
fastavro.writer(
fo=fo,
schema=avro_schema,
records=[],
codec="snappy",
strict_allow_default=True,
)

fo = data_filepath.open("a+b")

while page <= total_pages:
params.update({"page_size": page_size, "page": page})

Expand All @@ -118,8 +141,24 @@ def list(

total_count = response_json["total_count"]
total_pages = response_json["total_pages"]
data.extend(response_json["data"])
data = response_json["data"]

if avro_schema is not None:
fastavro.writer(
fo=fo,
schema=avro_schema,
records=data,
codec="snappy",
strict_allow_default=True,
)
else:
all_data.extend(data)

page += 1

return int(total_count), data
fo.close()

if avro_schema is not None:
return int(total_count), data_filepath
else:
return int(total_count), all_data
Loading

0 comments on commit f18cc0f

Please sign in to comment.