diff --git a/backend_py/primary/primary/routers/timeseries/router.py b/backend_py/primary/primary/routers/timeseries/router.py index 7603db500..00ed2cdca 100644 --- a/backend_py/primary/primary/routers/timeseries/router.py +++ b/backend_py/primary/primary/routers/timeseries/router.py @@ -13,7 +13,7 @@ from primary.services.sumo_access.summary_access import Frequency, SummaryAccess from primary.services.sumo_access.summary_types import VectorMetadata from primary.services.utils.authenticated_user import AuthenticatedUser -from primary.services.utils.timeseries_helpers import create_delta_vector_table +from primary.services.utils.timeseries_helpers import _create_delta_vector_table, create_delta_vector_table_async from . import converters, schemas import asyncio @@ -84,6 +84,7 @@ async def get_delta_ensemble_vector_list( vector_names.intersection_update({vi.name for vi in second_vector_info_arr}) perf_metrics.record_lap("create-vectors-names-intersection") + # Create vector descriptions, no historical vectors! ret_arr: list[schemas.VectorDescription] = [ schemas.VectorDescription(name=vi, descriptive_name=vi, has_historical=False) for vi in vector_names ] @@ -148,7 +149,7 @@ async def get_delta_ensemble_realizations_vector_data( second_case_uuid: Annotated[str, Query(description="Sumo case uuid")], second_ensemble_name: Annotated[str, Query(description="Ensemble name")], vector_name: Annotated[str, Query(description="Name of the vector")], - resampling_frequency: Annotated[schemas.Frequency | None, Query(description="Resampling frequency. If not specified, raw data without resampling wil be returned.")] = None, + resampling_frequency: Annotated[schemas.Frequency, Query(description="Resampling frequency")], realizations: Annotated[list[int] | None, Query(description="Optional list of realizations to include. If not specified, all realizations will be returned.")] = None, # relative_to_timestamp: Annotated[datetime.datetime | None, Query(description="Calculate relative to timestamp")] = None, # fmt:on @@ -157,51 +158,25 @@ async def get_delta_ensemble_realizations_vector_data( perf_metrics = ResponsePerfMetrics(response) + service_freq = Frequency.from_string_value(resampling_frequency.value) + if service_freq is None: + raise HTTPException( + status_code=400, detail="Resampling frequency must be specified to create delta ensemble vector" + ) + first_access = SummaryAccess.from_case_uuid( authenticated_user.get_sumo_access_token(), first_case_uuid, first_ensemble_name ) second_access = SummaryAccess.from_case_uuid( authenticated_user.get_sumo_access_token(), second_case_uuid, second_ensemble_name ) - - sumo_freq = Frequency.from_string_value(resampling_frequency.value if resampling_frequency else "dummy") - - # Get tables parallel - # - Resampled data is assumed to be s.t. dates/timestamps are comparable between ensembles and cases, i.e. timestamps - # for a resampling of a daily vector in both ensembles should be the same - (first_vector_table_pa, first_metadata), (second_vector_table_pa, _) = await asyncio.gather( - first_access.get_vector_table_async( - vector_name=vector_name, - resampling_frequency=sumo_freq, - realizations=realizations, - ), - second_access.get_vector_table_async( - vector_name=vector_name, - resampling_frequency=sumo_freq, - realizations=realizations, - ), - ) - perf_metrics.record_lap("get-vector") - - # Create delta ensemble data - delta_table = create_delta_vector_table(first_vector_table_pa, second_vector_table_pa, vector_name) - perf_metrics.record_lap("calc-delta-vector") - - # TODO: Fix correct metadata for delta vector - delta_vector_metadata = VectorMetadata( - name=first_metadata.name, - unit=first_metadata.unit, - is_total=first_metadata.is_total, - is_rate=first_metadata.is_rate, - is_historical=first_metadata.is_historical, - keyword=first_metadata.keyword, - wgname=first_metadata.wgname, - get_num=first_metadata.get_num, + delta_vector_table, delta_vector_metadata = create_delta_vector_table_async( + first_access, second_access, vector_name, service_freq, realizations, perf_metrics ) # TODO: Consider moving this from SummaryAccess to a helper function/util? delta_vector_arr = SummaryAccess.create_realization_vector_list_from_vector_table_and_metadata( - delta_table, vector_name, delta_vector_metadata + delta_vector_table, vector_name, delta_vector_metadata ) perf_metrics.record_lap("create-delta-vector-arr") @@ -332,54 +307,29 @@ async def get_delta_ensemble_statistical_vector_data( perf_metrics = ResponsePerfMetrics(response) + service_freq = Frequency.from_string_value(resampling_frequency.value) + service_stat_funcs_to_compute = converters.to_service_statistic_functions(statistic_functions) + + if service_freq is None: + raise HTTPException( + status_code=400, detail="Resampling frequency must be specified to create delta ensemble vector" + ) + first_access = SummaryAccess.from_case_uuid( authenticated_user.get_sumo_access_token(), first_case_uuid, first_ensemble_name ) second_access = SummaryAccess.from_case_uuid( authenticated_user.get_sumo_access_token(), second_case_uuid, second_ensemble_name ) - - service_freq = Frequency.from_string_value(resampling_frequency.value) - service_stat_funcs_to_compute = converters.to_service_statistic_functions(statistic_functions) - - # Get tables parallel - # - Resampled data is assumed to be s.t. dates/timestamps are comparable between ensembles and cases, i.e. timestamps - # for a resampling of a daily vector in both ensembles should be the same - (first_vector_table_pa, first_metadata), (second_vector_table_pa, _) = await asyncio.gather( - first_access.get_vector_table_async( - vector_name=vector_name, - resampling_frequency=service_freq, - realizations=realizations, - ), - second_access.get_vector_table_async( - vector_name=vector_name, - resampling_frequency=service_freq, - realizations=realizations, - ), + delta_vector_table, delta_vector_metadata = create_delta_vector_table_async( + first_access, second_access, vector_name, service_freq, realizations, perf_metrics ) - perf_metrics.record_lap("get-vector") - # Create delta ensemble data - delta_table = create_delta_vector_table(first_vector_table_pa, second_vector_table_pa, vector_name) - perf_metrics.record_lap("calc-delta-vector") - - statistics = compute_vector_statistics(delta_table, vector_name, service_stat_funcs_to_compute) + statistics = compute_vector_statistics(delta_vector_table, vector_name, service_stat_funcs_to_compute) if not statistics: raise HTTPException(status_code=404, detail="Could not compute statistics") perf_metrics.record_lap("calc-delta-vector-stat") - # TODO: Fix correct metadata for delta vector - delta_vector_metadata = VectorMetadata( - name=first_metadata.name, - unit=first_metadata.unit, - is_total=first_metadata.is_total, - is_rate=first_metadata.is_rate, - is_historical=first_metadata.is_historical, - keyword=first_metadata.keyword, - wgname=first_metadata.wgname, - get_num=first_metadata.get_num, - ) - ret_data: schemas.VectorStatisticData = converters.to_api_vector_statistic_data(statistics, delta_vector_metadata) LOGGER.info(f"Loaded and computed delta ensemble statistical summary data in: {perf_metrics.to_string()}") diff --git a/backend_py/primary/primary/services/utils/timeseries_helpers.py b/backend_py/primary/primary/services/utils/timeseries_helpers.py index e1217b215..d94c873d0 100644 --- a/backend_py/primary/primary/services/utils/timeseries_helpers.py +++ b/backend_py/primary/primary/services/utils/timeseries_helpers.py @@ -1,10 +1,14 @@ import pyarrow as pa import pyarrow.compute as pc +from primary.services.sumo_access.summary_access import SummaryAccess +from primary.services.sumo_access.summary_types import Frequency, VectorMetadata from primary.services.service_exceptions import InvalidDataError, Service +import asyncio -def is_valid_vector_table(vector_table: pa.Table, vector_name: str) -> bool: + +def _is_valid_vector_table(vector_table: pa.Table, vector_name: str) -> bool: """ Check if the vector table is valid. @@ -16,7 +20,7 @@ def is_valid_vector_table(vector_table: pa.Table, vector_name: str) -> bool: raise InvalidDataError(f"Unexpected columns in table {unexpected_columns}", Service.SUMO) -def create_delta_vector_table( +def _create_delta_vector_table( first_vector_table: pa.Table, second_vector_table: pa.Table, vector_name: str ) -> pa.Table: """ @@ -25,12 +29,12 @@ def create_delta_vector_table( Performs "inner join". Only obtain matching index ["DATE", "REAL"] - i.e "DATE"-"REAL" combination present in only one vector is neglected. - Note: Pre-processing of DATE-columns, e.g. resampling, should be done before calling this function. + Returns: A table with columns ["DATE", "REAL", vector_name] where vector_name contains the delta values. - `Returns` a table with columns ["DATE", "REAL", vector_name] where vector_name contains the delta values. + `Note`: Pre-processing of DATE-columns, e.g. resampling, should be done before calling this function. """ - is_valid_vector_table(first_vector_table, vector_name) - is_valid_vector_table(second_vector_table, vector_name) + _is_valid_vector_table(first_vector_table, vector_name) + _is_valid_vector_table(second_vector_table, vector_name) joined_vector_table = first_vector_table.join( second_vector_table, keys=["DATE", "REAL"], join_type="inner", right_suffix="_second" @@ -38,6 +42,8 @@ def create_delta_vector_table( delta_vector = pc.subtract( joined_vector_table.column(vector_name), joined_vector_table.column(f"{vector_name}_second") ) + + # TODO: Should a schema be defined for the delta vector? delta_table = pa.table( { "DATE": joined_vector_table.column("DATE"), @@ -47,3 +53,54 @@ def create_delta_vector_table( ) return delta_table + + +async def create_delta_vector_table_async( + first_access: SummaryAccess, + second_access: SummaryAccess, + vector_name: str, + resampling_frequency: Frequency, + realizations: list[int] | None, +) -> tuple[pa.Table, VectorMetadata]: + """ + Create a table with delta values of the requested vector name between the two input tables. + + Performs "inner join". Only obtain matching index ["DATE", "REAL"] - i.e "DATE"-"REAL" combination + present in only one vector is neglected. + + Returns: A table with columns ["DATE", "REAL", vector_name] where vector_name contains the delta values. + + `Note`: Pre-processing of DATE-columns, e.g. resampling, should be done before calling this function. + """ + # Get tables parallel + # - Resampled data is assumed to be s.t. dates/timestamps are comparable between ensembles and cases, i.e. timestamps + # for a resampling of a daily vector in both ensembles should be the same + (first_vector_table_pa, first_metadata), (second_vector_table_pa, _) = await asyncio.gather( + first_access.get_vector_table_async( + vector_name=vector_name, + resampling_frequency=resampling_frequency, + realizations=realizations, + ), + second_access.get_vector_table_async( + vector_name=vector_name, + resampling_frequency=resampling_frequency, + realizations=realizations, + ), + ) + + # Create delta ensemble data + delta_table = _create_delta_vector_table(first_vector_table_pa, second_vector_table_pa, vector_name) + + # TODO: Fix correct metadata for delta vector + delta_vector_metadata = VectorMetadata( + name=first_metadata.name, + unit=first_metadata.unit, + is_total=first_metadata.is_total, + is_rate=first_metadata.is_rate, + is_historical=first_metadata.is_historical, + keyword=first_metadata.keyword, + wgname=first_metadata.wgname, + get_num=first_metadata.get_num, + ) + + return delta_table, delta_vector_metadata