Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: list to avro file #2263

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ select
actions,
custom_fields,
penalties,
attachments,

/* transformations */
nullif(addlreqs, '') as addl_reqs,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
select
i.incident_id,

a.attachmenttype as attachment_type,
a.contenttype as content_type,
a.entityname as entity_name,
a.entitytype as entity_type,
a.internalfilename as internal_filename,
a.internalfolder as internal_folder,
a.minuserlevel as min_user_level,
a.minuserlevelgroupname as min_user_level_group_name,
a.publicfilename as public_filename,
a.reporttype as report_type,
a.sourcetype as source_type,
a.url,

a.filepostedat.timezone as file_posted_at__timezone,
a.filepostedat.timezone_type as file_posted_at__timezone_type,

cast(a.attachmentid as int) as attachment_id,
cast(a.bytes as int) as bytes,
cast(a.entityid as int) as entity_id,
cast(a.schoolid as int) as school_id,
cast(a.sourceid as int) as source_id,
cast(a.studentid as int) as student_id,
cast(a.termid as int) as term_id,

cast(a.reportdate as date) as report_date,

cast(a.filepostedat.`date` as datetime) as file_posted_at__date,
from {{ ref("stg_deanslist__incidents") }} as i
cross join unnest(i.attachments) as a
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/amplify/dds/sources-external.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: amplify
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify
{%- else -%}kipptaf_amplify
{%- endif %}
tags:
- stage_external_sources
tables:
Expand Down
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/amplify/dibels/sources-drive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: amplify
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify
{%- else -%}kipptaf_amplify
{%- endif %}
tags:
- stage_external_sources
tables:
Expand Down
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/amplify/mclass/sources-external.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: amplify
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kipptaf_amplify{% else %}kipptaf_amplify{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kipptaf_amplify
{%- else -%}kipptaf_amplify
{%- endif %}
tags:
- stage_external_sources
tables:
Expand Down
12 changes: 12 additions & 0 deletions src/dbt/kipptaf/models/deanslist/sources-kippcamden.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ version: 2

sources:
- name: kippcamden_deanslist
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippcamden_deanslist
{%- else -%}kippcamden_deanslist
{%- endif %}
tables:
- name: stg_deanslist__behavior
meta:
Expand Down Expand Up @@ -139,3 +143,11 @@ sources:
- kippcamden
- deanslist
- stg_deanslist__dff_stats
- name: stg_deanslist__incidents__attachments
meta:
dagster:
group: deanslist
asset_key:
- kippcamden
- deanslist
- stg_deanslist__incidents__attachments
12 changes: 12 additions & 0 deletions src/dbt/kipptaf/models/deanslist/sources-kippmiami.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ version: 2

sources:
- name: kippmiami_deanslist
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippmiami_deanslist
{%- else -%}kippmiami_deanslist
{%- endif %}
tables:
- name: stg_deanslist__behavior
meta:
Expand Down Expand Up @@ -139,3 +143,11 @@ sources:
- kippmiami
- deanslist
- stg_deanslist__dff_stats
- name: stg_deanslist__incidents__attachments
meta:
dagster:
group: deanslist
asset_key:
- kippmiami
- deanslist
- stg_deanslist__incidents__attachments
12 changes: 12 additions & 0 deletions src/dbt/kipptaf/models/deanslist/sources-kippnewark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ version: 2

sources:
- name: kippnewark_deanslist
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippnewark_deanslist
{%- else -%}kippnewark_deanslist
{%- endif %}
tables:
- name: stg_deanslist__behavior
meta:
Expand Down Expand Up @@ -139,3 +143,11 @@ sources:
- kippnewark
- deanslist
- stg_deanslist__dff_stats
- name: stg_deanslist__incidents__attachments
meta:
dagster:
group: deanslist
asset_key:
- kippnewark
- deanslist
- stg_deanslist__incidents__attachments
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{
dbt_utils.union_relations(
relations=[
source("kippnewark_deanslist", "stg_deanslist__incidents__attachments"),
source("kippcamden_deanslist", "stg_deanslist__incidents__attachments"),
source("kippmiami_deanslist", "stg_deanslist__incidents__attachments"),
]
)
}}
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/iready/sources-kippmiami.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: kippmiami_iready
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kippmiami_iready{% else %}kippmiami_iready{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippmiami_iready
{%- else -%}kippmiami_iready
{%- endif %}
tables:
- name: stg_iready__diagnostic_results
meta:
Expand Down
7 changes: 4 additions & 3 deletions src/dbt/kipptaf/models/iready/sources-kippnj.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: 2

sources:
- name: kippnj_iready
schema:
"{% if env_var('DBT_DEV', '') == 'true' %}_dev_kippnewark_iready{% else %}kippnewark_iready{%
endif %}"
schema: |
{% if env_var('DBT_DEV', '') == 'true' -%}_dev_kippnewark_iready
{%- else -%}kippnewark_iready
{%- endif %}
tables:
- name: stg_iready__diagnostic_results
meta:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{{
dbt_utils.union_relations(
relations=[
source("kippnj_iready", model.name),
source("kippmiami_iready", model.name),
source("kippnj_iready", "stg_iready__instruction_by_lesson_pro"),
source("kippmiami_iready", "stg_iready__instruction_by_lesson_pro"),
]
)
}}
10 changes: 0 additions & 10 deletions src/teamster/code_locations/kippcamden/deanslist/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@
api_version="v1",
schema=BEHAVIOR_SCHEMA,
partitions_def=DEANSLIST_FISCAL_MULTI_PARTITIONS_DEF,
op_tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"memory": "0.5Gi"},
"limits": {"memory": "6.5Gi"},
}
}
}
},
)

fiscal_multi_partitions_assets = [behavior, *fiscal_multi_partitions_assets]
Expand Down
10 changes: 0 additions & 10 deletions src/teamster/code_locations/kippnewark/deanslist/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@
api_version="v1",
schema=BEHAVIOR_SCHEMA,
partitions_def=DEANSLIST_FISCAL_MULTI_PARTITIONS_DEF,
op_tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "250m", "memory": "0.5Gi"},
"limits": {"cpu": "1250m", "memory": "4.0Gi"},
}
}
}
},
)

fiscal_multi_partitions_assets = [behavior, *fiscal_multi_partitions_assets]
Expand Down
13 changes: 13 additions & 0 deletions src/teamster/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from dagster import EnvVar
from dagster_dbt import DbtCliResource
from dagster_gcp import BigQueryResource, GCSResource
Expand Down Expand Up @@ -71,12 +73,19 @@


def get_io_manager_gcs_pickle(code_location):
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1":
code_location = "test"

return GCSIOManager(
gcs=GCS_RESOURCE, gcs_bucket=f"teamster-{code_location}", object_type="pickle"
)


def get_io_manager_gcs_avro(code_location, test=False):
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1":
code_location = "test"
test = True

return GCSIOManager(
gcs=GCS_RESOURCE,
gcs_bucket=f"teamster-{code_location}",
Expand All @@ -86,6 +95,10 @@ def get_io_manager_gcs_avro(code_location, test=False):


def get_io_manager_gcs_file(code_location, test=False):
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT") == "1":
code_location = "test"
test = True

return GCSIOManager(
gcs=GCS_RESOURCE,
gcs_bucket=f"teamster-{code_location}",
Expand Down
17 changes: 6 additions & 11 deletions src/teamster/libraries/deanslist/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,22 @@ def build_deanslist_paginated_multi_partition_asset(
code_location: str,
endpoint: str,
api_version: str,
schema,
schema: dict,
partitions_def: MultiPartitionsDefinition,
op_tags: dict | None = None,
params: dict | None = None,
) -> AssetsDefinition:
if params is None:
params = {}

asset_key = [code_location, "deanslist", "behavior"]

@asset(
key=asset_key,
key=[code_location, "deanslist", "behavior"],
metadata=params,
io_manager_key="io_manager_gcs_avro",
io_manager_key="io_manager_gcs_file",
partitions_def=partitions_def,
op_tags=op_tags,
group_name="deanslist",
kinds={"python"},
check_specs=[build_check_spec_avro_schema_valid(asset_key)],
)
def _asset(context: AssetExecutionContext, deanslist: DeansListResource):
partition_key = _check.inst(obj=context.partition_key, ttype=MultiPartitionKey)
Expand All @@ -172,9 +169,10 @@ def _asset(context: AssetExecutionContext, deanslist: DeansListResource):

date_partition_key_fy = FiscalYear(datetime=date_partition_key, start_month=7)

total_count, data = deanslist.list(
total_count, data_filepath = deanslist.list(
api_version=api_version,
endpoint=endpoint,
avro_schema=schema,
school_id=int(partition_key.keys_by_dimension["school"]),
params={
"UpdatedSince": date_partition_key.date().isoformat(),
Expand All @@ -184,9 +182,6 @@ def _asset(context: AssetExecutionContext, deanslist: DeansListResource):
},
)

yield Output(value=(data, schema), metadata={"records": total_count})
yield check_avro_schema_valid(
asset_key=context.asset_key, records=data, schema=schema
)
yield Output(value=data_filepath, metadata={"records": total_count})

return _asset
45 changes: 42 additions & 3 deletions src/teamster/libraries/deanslist/resources.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import pathlib

import fastavro
import fastavro.types
import yaml
from dagster import ConfigurableResource, DagsterLogManager, InitResourceContext, _check
from pydantic import PrivateAttr
Expand Down Expand Up @@ -99,16 +103,35 @@ def list(
school_id: int,
params: dict,
page_size: int = 250000,
avro_schema: fastavro.types.Schema | None = None,
*args,
**kwargs,
):
page: int = 1
total_pages: int = 2
total_count: int = 0
data: list[dict] = []
all_data: list[dict] = []

data_filepath = pathlib.Path(
f"env/deanslist/{endpoint}/{params["UpdatedSince"]}/{school_id}/data.avro"
).absolute()

url = self._get_url(*args, api_version=api_version, endpoint=endpoint)

if avro_schema is not None:
data_filepath.parent.mkdir(parents=True, exist_ok=True)

with data_filepath.open("wb") as fo:
fastavro.writer(
fo=fo,
schema=avro_schema,
records=[],
codec="snappy",
strict_allow_default=True,
)

fo = data_filepath.open("a+b")

while page <= total_pages:
params.update({"page_size": page_size, "page": page})

Expand All @@ -118,8 +141,24 @@ def list(

total_count = response_json["total_count"]
total_pages = response_json["total_pages"]
data.extend(response_json["data"])
data = response_json["data"]

if avro_schema is not None:
fastavro.writer(
fo=fo,
schema=avro_schema,
records=data,
codec="snappy",
strict_allow_default=True,
)
else:
all_data.extend(data)

page += 1

return int(total_count), data
fo.close()

if avro_schema is not None:
return int(total_count), data_filepath
else:
return int(total_count), all_data
Loading
Loading