Skip to content

Commit

Permalink
Adjust back-end
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgenherje committed Nov 25, 2024
1 parent 5dedec1 commit a1c0a3c
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 79 deletions.
96 changes: 23 additions & 73 deletions backend_py/primary/primary/routers/timeseries/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from primary.services.sumo_access.summary_access import Frequency, SummaryAccess
from primary.services.sumo_access.summary_types import VectorMetadata
from primary.services.utils.authenticated_user import AuthenticatedUser
from primary.services.utils.timeseries_helpers import create_delta_vector_table
from primary.services.utils.timeseries_helpers import _create_delta_vector_table, create_delta_vector_table_async

from . import converters, schemas
import asyncio
Expand Down Expand Up @@ -84,6 +84,7 @@ async def get_delta_ensemble_vector_list(
vector_names.intersection_update({vi.name for vi in second_vector_info_arr})
perf_metrics.record_lap("create-vectors-names-intersection")

# Create vector descriptions, no historical vectors!
ret_arr: list[schemas.VectorDescription] = [
schemas.VectorDescription(name=vi, descriptive_name=vi, has_historical=False) for vi in vector_names
]
Expand Down Expand Up @@ -148,7 +149,7 @@ async def get_delta_ensemble_realizations_vector_data(
second_case_uuid: Annotated[str, Query(description="Sumo case uuid")],
second_ensemble_name: Annotated[str, Query(description="Ensemble name")],
vector_name: Annotated[str, Query(description="Name of the vector")],
resampling_frequency: Annotated[schemas.Frequency | None, Query(description="Resampling frequency. If not specified, raw data without resampling wil be returned.")] = None,
resampling_frequency: Annotated[schemas.Frequency, Query(description="Resampling frequency")],
realizations: Annotated[list[int] | None, Query(description="Optional list of realizations to include. If not specified, all realizations will be returned.")] = None,
# relative_to_timestamp: Annotated[datetime.datetime | None, Query(description="Calculate relative to timestamp")] = None,
# fmt:on
Expand All @@ -157,51 +158,25 @@ async def get_delta_ensemble_realizations_vector_data(

perf_metrics = ResponsePerfMetrics(response)

service_freq = Frequency.from_string_value(resampling_frequency.value)
if service_freq is None:
raise HTTPException(
status_code=400, detail="Resampling frequency must be specified to create delta ensemble vector"
)

first_access = SummaryAccess.from_case_uuid(
authenticated_user.get_sumo_access_token(), first_case_uuid, first_ensemble_name
)
second_access = SummaryAccess.from_case_uuid(
authenticated_user.get_sumo_access_token(), second_case_uuid, second_ensemble_name
)

sumo_freq = Frequency.from_string_value(resampling_frequency.value if resampling_frequency else "dummy")

# Get tables parallel
# - Resampled data is assumed to be s.t. dates/timestamps are comparable between ensembles and cases, i.e. timestamps
# for a resampling of a daily vector in both ensembles should be the same
(first_vector_table_pa, first_metadata), (second_vector_table_pa, _) = await asyncio.gather(
first_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=sumo_freq,
realizations=realizations,
),
second_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=sumo_freq,
realizations=realizations,
),
)
perf_metrics.record_lap("get-vector")

# Create delta ensemble data
delta_table = create_delta_vector_table(first_vector_table_pa, second_vector_table_pa, vector_name)
perf_metrics.record_lap("calc-delta-vector")

# TODO: Fix correct metadata for delta vector
delta_vector_metadata = VectorMetadata(
name=first_metadata.name,
unit=first_metadata.unit,
is_total=first_metadata.is_total,
is_rate=first_metadata.is_rate,
is_historical=first_metadata.is_historical,
keyword=first_metadata.keyword,
wgname=first_metadata.wgname,
get_num=first_metadata.get_num,
delta_vector_table, delta_vector_metadata = create_delta_vector_table_async(
first_access, second_access, vector_name, service_freq, realizations, perf_metrics
)

# TODO: Consider moving this from SummaryAccess to a helper function/util?
delta_vector_arr = SummaryAccess.create_realization_vector_list_from_vector_table_and_metadata(
delta_table, vector_name, delta_vector_metadata
delta_vector_table, vector_name, delta_vector_metadata
)
perf_metrics.record_lap("create-delta-vector-arr")

Expand Down Expand Up @@ -332,54 +307,29 @@ async def get_delta_ensemble_statistical_vector_data(

perf_metrics = ResponsePerfMetrics(response)

service_freq = Frequency.from_string_value(resampling_frequency.value)
service_stat_funcs_to_compute = converters.to_service_statistic_functions(statistic_functions)

if service_freq is None:
raise HTTPException(
status_code=400, detail="Resampling frequency must be specified to create delta ensemble vector"
)

first_access = SummaryAccess.from_case_uuid(
authenticated_user.get_sumo_access_token(), first_case_uuid, first_ensemble_name
)
second_access = SummaryAccess.from_case_uuid(
authenticated_user.get_sumo_access_token(), second_case_uuid, second_ensemble_name
)

service_freq = Frequency.from_string_value(resampling_frequency.value)
service_stat_funcs_to_compute = converters.to_service_statistic_functions(statistic_functions)

# Get tables parallel
# - Resampled data is assumed to be s.t. dates/timestamps are comparable between ensembles and cases, i.e. timestamps
# for a resampling of a daily vector in both ensembles should be the same
(first_vector_table_pa, first_metadata), (second_vector_table_pa, _) = await asyncio.gather(
first_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=service_freq,
realizations=realizations,
),
second_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=service_freq,
realizations=realizations,
),
delta_vector_table, delta_vector_metadata = create_delta_vector_table_async(
first_access, second_access, vector_name, service_freq, realizations, perf_metrics
)
perf_metrics.record_lap("get-vector")

# Create delta ensemble data
delta_table = create_delta_vector_table(first_vector_table_pa, second_vector_table_pa, vector_name)
perf_metrics.record_lap("calc-delta-vector")

statistics = compute_vector_statistics(delta_table, vector_name, service_stat_funcs_to_compute)
statistics = compute_vector_statistics(delta_vector_table, vector_name, service_stat_funcs_to_compute)
if not statistics:
raise HTTPException(status_code=404, detail="Could not compute statistics")
perf_metrics.record_lap("calc-delta-vector-stat")

# TODO: Fix correct metadata for delta vector
delta_vector_metadata = VectorMetadata(
name=first_metadata.name,
unit=first_metadata.unit,
is_total=first_metadata.is_total,
is_rate=first_metadata.is_rate,
is_historical=first_metadata.is_historical,
keyword=first_metadata.keyword,
wgname=first_metadata.wgname,
get_num=first_metadata.get_num,
)

ret_data: schemas.VectorStatisticData = converters.to_api_vector_statistic_data(statistics, delta_vector_metadata)

LOGGER.info(f"Loaded and computed delta ensemble statistical summary data in: {perf_metrics.to_string()}")
Expand Down
69 changes: 63 additions & 6 deletions backend_py/primary/primary/services/utils/timeseries_helpers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import pyarrow as pa
import pyarrow.compute as pc

from primary.services.sumo_access.summary_access import SummaryAccess
from primary.services.sumo_access.summary_types import Frequency, VectorMetadata
from primary.services.service_exceptions import InvalidDataError, Service

import asyncio

def is_valid_vector_table(vector_table: pa.Table, vector_name: str) -> bool:

def _is_valid_vector_table(vector_table: pa.Table, vector_name: str) -> bool:
"""
Check if the vector table is valid.
Expand All @@ -16,7 +20,7 @@ def is_valid_vector_table(vector_table: pa.Table, vector_name: str) -> bool:
raise InvalidDataError(f"Unexpected columns in table {unexpected_columns}", Service.SUMO)


def create_delta_vector_table(
def _create_delta_vector_table(
first_vector_table: pa.Table, second_vector_table: pa.Table, vector_name: str
) -> pa.Table:
"""
Expand All @@ -25,19 +29,21 @@ def create_delta_vector_table(
Performs "inner join". Only obtain matching index ["DATE", "REAL"] - i.e "DATE"-"REAL" combination
present in only one vector is neglected.
Note: Pre-processing of DATE-columns, e.g. resampling, should be done before calling this function.
Returns: A table with columns ["DATE", "REAL", vector_name] where vector_name contains the delta values.
`Returns` a table with columns ["DATE", "REAL", vector_name] where vector_name contains the delta values.
`Note`: Pre-processing of DATE-columns, e.g. resampling, should be done before calling this function.
"""
is_valid_vector_table(first_vector_table, vector_name)
is_valid_vector_table(second_vector_table, vector_name)
_is_valid_vector_table(first_vector_table, vector_name)
_is_valid_vector_table(second_vector_table, vector_name)

joined_vector_table = first_vector_table.join(
second_vector_table, keys=["DATE", "REAL"], join_type="inner", right_suffix="_second"
)
delta_vector = pc.subtract(
joined_vector_table.column(vector_name), joined_vector_table.column(f"{vector_name}_second")
)

# TODO: Should a schema be defined for the delta vector?
delta_table = pa.table(
{
"DATE": joined_vector_table.column("DATE"),
Expand All @@ -47,3 +53,54 @@ def create_delta_vector_table(
)

return delta_table


async def create_delta_vector_table_async(
first_access: SummaryAccess,
second_access: SummaryAccess,
vector_name: str,
resampling_frequency: Frequency,
realizations: list[int] | None,
) -> tuple[pa.Table, VectorMetadata]:
"""
Create a table with delta values of the requested vector name between the two input tables.
Performs "inner join". Only obtain matching index ["DATE", "REAL"] - i.e "DATE"-"REAL" combination
present in only one vector is neglected.
Returns: A table with columns ["DATE", "REAL", vector_name] where vector_name contains the delta values.
`Note`: Pre-processing of DATE-columns, e.g. resampling, should be done before calling this function.
"""
# Get tables parallel
# - Resampled data is assumed to be s.t. dates/timestamps are comparable between ensembles and cases, i.e. timestamps
# for a resampling of a daily vector in both ensembles should be the same
(first_vector_table_pa, first_metadata), (second_vector_table_pa, _) = await asyncio.gather(
first_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=resampling_frequency,
realizations=realizations,
),
second_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=resampling_frequency,
realizations=realizations,
),
)

# Create delta ensemble data
delta_table = _create_delta_vector_table(first_vector_table_pa, second_vector_table_pa, vector_name)

# TODO: Fix correct metadata for delta vector
delta_vector_metadata = VectorMetadata(
name=first_metadata.name,
unit=first_metadata.unit,
is_total=first_metadata.is_total,
is_rate=first_metadata.is_rate,
is_historical=first_metadata.is_historical,
keyword=first_metadata.keyword,
wgname=first_metadata.wgname,
get_num=first_metadata.get_num,
)

return delta_table, delta_vector_metadata

0 comments on commit a1c0a3c

Please sign in to comment.