Skip to content

Commit

Permalink
rm deprecated fn
Browse files Browse the repository at this point in the history
  • Loading branch information
cbini committed Nov 17, 2023
1 parent 8b27854 commit ef1e364
Showing 1 changed file with 39 additions and 71 deletions.
110 changes: 39 additions & 71 deletions src/teamster/core/deanslist/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import pendulum
from dagster import (
AssetExecutionContext,
AssetsDefinition,
MultiPartitionsDefinition,
OpExecutionContext,
Output,
StaticPartitionsDefinition,
asset,
Expand Down Expand Up @@ -33,7 +33,7 @@ def build_static_partition_asset(
op_tags=op_tags,
io_manager_key="io_manager_gcs_avro",
)
def _asset(context: OpExecutionContext, deanslist: DeansListResource):
def _asset(context: AssetExecutionContext, deanslist: DeansListResource):
endpoint_content = deanslist.get(
api_version=api_version,
endpoint=asset_name,
Expand Down Expand Up @@ -73,24 +73,12 @@ def build_multi_partition_asset(
op_tags=op_tags,
io_manager_key="io_manager_gcs_avro",
)
def _asset(context: OpExecutionContext, deanslist: DeansListResource):
asset_key = context.asset_key_for_output()
def _asset(context: AssetExecutionContext, deanslist: DeansListResource):
school_partition = context.partition_key.keys_by_dimension["school"]
date_partition = pendulum.from_format(
string=context.partition_key.keys_by_dimension["date"], fmt="YYYY-MM-DD"
).subtract(days=1)

# check if school paritition has ever been materialized
school_materialization_count = 0
asset_materialization_counts = (
context.instance.get_materialization_count_by_partition([asset_key]).get(
asset_key, {}
)
)
for partition_key, count in asset_materialization_counts.items():
if school_partition == partition_key.split("|")[-1]:
school_materialization_count += count

# determine if endpoint is within time-window
if set(["StartDate", "EndDate"]).issubset(params.keys()) or set(
["sdt", "edt"]
Expand All @@ -101,68 +89,48 @@ def _asset(context: OpExecutionContext, deanslist: DeansListResource):

# determine start and end dates
partition_fy = FiscalYear(datetime=date_partition, start_month=7)
inception_fy = FiscalYear(datetime=inception_date, start_month=7)

if (
school_materialization_count == 0
or school_materialization_count == context.retry_number
):
start_date = inception_fy.start
if is_time_bound:
end_date = partition_fy.end
else:
end_date = inception_fy.end

partition_modified_date = None
else:
start_date = partition_fy.start
end_date = partition_fy.end
partition_modified_date = date_partition

multiyear_period = end_date - start_date
total_row_count = 0
all_data = []

for year_start in multiyear_period.range(unit="years"):
fiscal_year = FiscalYear(datetime=year_start, start_month=7)

fy_period = fiscal_year.end - fiscal_year.start

for month in fy_period.range(unit="months"):
modified_date = partition_modified_date or fiscal_year.start
composed_params = copy.deepcopy(params)

for k, v in composed_params.items():
if isinstance(v, str):
composed_params[k] = v.format(
start_date=month.start_of("month").to_date_string(),
end_date=month.end_of("month").to_date_string(),
)

endpoint_content = deanslist.get(
api_version=api_version,
endpoint=asset_name,
school_id=int(school_partition),
params={
"UpdatedSince": modified_date.to_date_string(),
**composed_params,
},
)

row_count = endpoint_content["row_count"]
data = endpoint_content["data"]
del endpoint_content
gc.collect()
fy_period = partition_fy.end - partition_fy.start

for month in fy_period.range(unit="months"):
composed_params = copy.deepcopy(params)

for k, v in composed_params.items():
if isinstance(v, str):
composed_params[k] = v.format(
start_date=month.start_of("month").to_date_string(),
end_date=month.end_of("month").to_date_string(),
)

endpoint_content = deanslist.get(
api_version=api_version,
endpoint=asset_name,
school_id=int(school_partition),
params={
"UpdatedSince": date_partition.to_date_string(),
**composed_params,
},
)

row_count = endpoint_content["row_count"]
data = endpoint_content["data"]

del endpoint_content
gc.collect()

if row_count > 0:
total_row_count += row_count
all_data.extend(data)
del data
gc.collect()
if row_count > 0:
total_row_count += row_count
all_data.extend(data)

del data
gc.collect()

# break loop for endpoints w/o start/end dates
if not is_time_bound:
break
# break loop for endpoints w/o start/end dates
if not is_time_bound:
break

yield Output(
value=(
Expand Down

0 comments on commit ef1e364

Please sign in to comment.