From ef1e364df2eb3dba4b1f5bf0c0ae765dc978295f Mon Sep 17 00:00:00 2001 From: Charlie Bini <5003326+cbini@users.noreply.github.com> Date: Fri, 17 Nov 2023 04:07:24 +0000 Subject: [PATCH] rm deprecated fn --- src/teamster/core/deanslist/assets.py | 110 +++++++++----------------- 1 file changed, 39 insertions(+), 71 deletions(-) diff --git a/src/teamster/core/deanslist/assets.py b/src/teamster/core/deanslist/assets.py index 47e8c2bbeb..2fe6b5cd58 100644 --- a/src/teamster/core/deanslist/assets.py +++ b/src/teamster/core/deanslist/assets.py @@ -3,9 +3,9 @@ import pendulum from dagster import ( + AssetExecutionContext, AssetsDefinition, MultiPartitionsDefinition, - OpExecutionContext, Output, StaticPartitionsDefinition, asset, @@ -33,7 +33,7 @@ def build_static_partition_asset( op_tags=op_tags, io_manager_key="io_manager_gcs_avro", ) - def _asset(context: OpExecutionContext, deanslist: DeansListResource): + def _asset(context: AssetExecutionContext, deanslist: DeansListResource): endpoint_content = deanslist.get( api_version=api_version, endpoint=asset_name, @@ -73,24 +73,12 @@ def build_multi_partition_asset( op_tags=op_tags, io_manager_key="io_manager_gcs_avro", ) - def _asset(context: OpExecutionContext, deanslist: DeansListResource): - asset_key = context.asset_key_for_output() + def _asset(context: AssetExecutionContext, deanslist: DeansListResource): school_partition = context.partition_key.keys_by_dimension["school"] date_partition = pendulum.from_format( string=context.partition_key.keys_by_dimension["date"], fmt="YYYY-MM-DD" ).subtract(days=1) - # check if school paritition has ever been materialized - school_materialization_count = 0 - asset_materialization_counts = ( - context.instance.get_materialization_count_by_partition([asset_key]).get( - asset_key, {} - ) - ) - for partition_key, count in asset_materialization_counts.items(): - if school_partition == partition_key.split("|")[-1]: - school_materialization_count += count - # determine if endpoint is within time-window if set(["StartDate", "EndDate"]).issubset(params.keys()) or set( ["sdt", "edt"] @@ -101,68 +89,48 @@ def _asset(context: OpExecutionContext, deanslist: DeansListResource): # determine start and end dates partition_fy = FiscalYear(datetime=date_partition, start_month=7) - inception_fy = FiscalYear(datetime=inception_date, start_month=7) - - if ( - school_materialization_count == 0 - or school_materialization_count == context.retry_number - ): - start_date = inception_fy.start - if is_time_bound: - end_date = partition_fy.end - else: - end_date = inception_fy.end - - partition_modified_date = None - else: - start_date = partition_fy.start - end_date = partition_fy.end - partition_modified_date = date_partition - multiyear_period = end_date - start_date total_row_count = 0 all_data = [] - for year_start in multiyear_period.range(unit="years"): - fiscal_year = FiscalYear(datetime=year_start, start_month=7) - - fy_period = fiscal_year.end - fiscal_year.start - - for month in fy_period.range(unit="months"): - modified_date = partition_modified_date or fiscal_year.start - composed_params = copy.deepcopy(params) - - for k, v in composed_params.items(): - if isinstance(v, str): - composed_params[k] = v.format( - start_date=month.start_of("month").to_date_string(), - end_date=month.end_of("month").to_date_string(), - ) - - endpoint_content = deanslist.get( - api_version=api_version, - endpoint=asset_name, - school_id=int(school_partition), - params={ - "UpdatedSince": modified_date.to_date_string(), - **composed_params, - }, - ) - - row_count = endpoint_content["row_count"] - data = endpoint_content["data"] - del endpoint_content - gc.collect() + fy_period = partition_fy.end - partition_fy.start + + for month in fy_period.range(unit="months"): + composed_params = copy.deepcopy(params) + + for k, v in composed_params.items(): + if isinstance(v, str): + composed_params[k] = v.format( + start_date=month.start_of("month").to_date_string(), + end_date=month.end_of("month").to_date_string(), + ) + + endpoint_content = deanslist.get( + api_version=api_version, + endpoint=asset_name, + school_id=int(school_partition), + params={ + "UpdatedSince": date_partition.to_date_string(), + **composed_params, + }, + ) + + row_count = endpoint_content["row_count"] + data = endpoint_content["data"] + + del endpoint_content + gc.collect() - if row_count > 0: - total_row_count += row_count - all_data.extend(data) - del data - gc.collect() + if row_count > 0: + total_row_count += row_count + all_data.extend(data) + + del data + gc.collect() - # break loop for endpoints w/o start/end dates - if not is_time_bound: - break + # break loop for endpoints w/o start/end dates + if not is_time_bound: + break yield Output( value=(