Skip to content

Commit

Permalink
Testing delta ensemble endpoints for timeseries
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgenherje committed Nov 22, 2024
1 parent 11b0231 commit 5dedec1
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 32 deletions.
110 changes: 80 additions & 30 deletions backend_py/primary/primary/routers/timeseries/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
from primary.services.sumo_access.generic_types import EnsembleScalarResponse
from primary.services.sumo_access.parameter_access import ParameterAccess
from primary.services.sumo_access.summary_access import Frequency, SummaryAccess
from primary.services.sumo_access.summary_types import VectorMetadata
from primary.services.utils.authenticated_user import AuthenticatedUser
from primary.services.utils.timeseries_helpers import create_delta_vector_table

from . import converters, schemas
import asyncio

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,9 +72,11 @@ async def get_delta_ensemble_vector_list(
)
perf_metrics.record_lap("get-access")

# NOTE: Make parallel requests?
first_vector_info_arr = await first_access.get_available_vectors_async()
second_vector_info_arr = await second_access.get_available_vectors_async()
# Get vectors parallel
first_vector_info_arr, second_vector_info_arr = await asyncio.gather(
first_access.get_available_vectors_async(),
second_access.get_available_vectors_async(),
)
perf_metrics.record_lap("get-available-vectors")

# Create intersection of vector names
Expand Down Expand Up @@ -161,22 +166,47 @@ async def get_delta_ensemble_realizations_vector_data(

sumo_freq = Frequency.from_string_value(resampling_frequency.value if resampling_frequency else "dummy")

# NOTE: Make parallel requests?
first_sumo_vec_arr = await first_access.get_vector_async(
vector_name=vector_name,
resampling_frequency=sumo_freq,
realizations=realizations,
)
second_sumo_vec_arr = await second_access.get_vector_async(
vector_name=vector_name,
resampling_frequency=sumo_freq,
realizations=realizations,
# Get tables parallel
# - Resampled data is assumed to be s.t. dates/timestamps are comparable between ensembles and cases, i.e. timestamps
# for a resampling of a daily vector in both ensembles should be the same
(first_vector_table_pa, first_metadata), (second_vector_table_pa, _) = await asyncio.gather(
first_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=sumo_freq,
realizations=realizations,
),
second_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=sumo_freq,
realizations=realizations,
),
)
perf_metrics.record_lap("get-vector")

# TODO: Create delta ensemble data
# Create delta ensemble data
delta_table = create_delta_vector_table(first_vector_table_pa, second_vector_table_pa, vector_name)
perf_metrics.record_lap("calc-delta-vector")

# TODO: Fix correct metadata for delta vector
delta_vector_metadata = VectorMetadata(
name=first_metadata.name,
unit=first_metadata.unit,
is_total=first_metadata.is_total,
is_rate=first_metadata.is_rate,
is_historical=first_metadata.is_historical,
keyword=first_metadata.keyword,
wgname=first_metadata.wgname,
get_num=first_metadata.get_num,
)

# TODO: Consider moving this from SummaryAccess to a helper function/util?
delta_vector_arr = SummaryAccess.create_realization_vector_list_from_vector_table_and_metadata(
delta_table, vector_name, delta_vector_metadata
)
perf_metrics.record_lap("create-delta-vector-arr")

ret_arr: list[schemas.VectorRealizationData] = []
for vec in first_sumo_vec_arr:
for vec in delta_vector_arr:
ret_arr.append(
schemas.VectorRealizationData(
realization=vec.realization,
Expand Down Expand Up @@ -312,27 +342,47 @@ async def get_delta_ensemble_statistical_vector_data(
service_freq = Frequency.from_string_value(resampling_frequency.value)
service_stat_funcs_to_compute = converters.to_service_statistic_functions(statistic_functions)

# NOTE: Make parallel requests?
first_vector_table, first_vector_metadata = await first_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=service_freq,
realizations=realizations,
)
second_vector_table, second_vector_metadata = await second_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=service_freq,
realizations=realizations,
# Get tables parallel
# - Resampled data is assumed to be s.t. dates/timestamps are comparable between ensembles and cases, i.e. timestamps
# for a resampling of a daily vector in both ensembles should be the same
(first_vector_table_pa, first_metadata), (second_vector_table_pa, _) = await asyncio.gather(
first_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=service_freq,
realizations=realizations,
),
second_access.get_vector_table_async(
vector_name=vector_name,
resampling_frequency=service_freq,
realizations=realizations,
),
)
perf_metrics.record_lap("get-table")
perf_metrics.record_lap("get-vector")

# Create delta ensemble data
delta_table = create_delta_vector_table(first_vector_table_pa, second_vector_table_pa, vector_name)
perf_metrics.record_lap("calc-delta-vector")

statistics = compute_vector_statistics(first_vector_table, vector_name, service_stat_funcs_to_compute)
statistics = compute_vector_statistics(delta_table, vector_name, service_stat_funcs_to_compute)
if not statistics:
raise HTTPException(status_code=404, detail="Could not compute statistics")
perf_metrics.record_lap("calc-stat")
perf_metrics.record_lap("calc-delta-vector-stat")

# TODO: Fix correct metadata for delta vector
delta_vector_metadata = VectorMetadata(
name=first_metadata.name,
unit=first_metadata.unit,
is_total=first_metadata.is_total,
is_rate=first_metadata.is_rate,
is_historical=first_metadata.is_historical,
keyword=first_metadata.keyword,
wgname=first_metadata.wgname,
get_num=first_metadata.get_num,
)

ret_data: schemas.VectorStatisticData = converters.to_api_vector_statistic_data(statistics, first_vector_metadata)
ret_data: schemas.VectorStatisticData = converters.to_api_vector_statistic_data(statistics, delta_vector_metadata)

LOGGER.info(f"Loaded and computed statistical summary data in: {perf_metrics.to_string()}")
LOGGER.info(f"Loaded and computed delta ensemble statistical summary data in: {perf_metrics.to_string()}")

return ret_data

Expand Down
17 changes: 17 additions & 0 deletions backend_py/primary/primary/services/sumo_access/summary_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,23 @@ async def get_vector_async(
) -> List[RealizationVector]:
table, vector_metadata = await self.get_vector_table_async(vector_name, resampling_frequency, realizations)

return self.create_realization_vector_list_from_vector_table_and_metadata(table, vector_name, vector_metadata)

@staticmethod
def create_realization_vector_list_from_vector_table_and_metadata(
table: pa.Table, vector_name: str, vector_metadata: VectorMetadata
) -> List[RealizationVector]:
"""
Create RealizationVector instances from the provided table and metadata.
Assuming table with columns DATE, REAL, and the vector column. Where the vector column if of type float32.
"""
# Verify that columns are as we expect
expected_columns = {"DATE", "REAL", vector_name}
if set(table.column_names) != expected_columns:
unexpected_columns = set(table.column_names) - expected_columns
raise InvalidDataError(f"Unexpected columns in table {unexpected_columns}", Service.SUMO)

real_arr_np = table.column("REAL").to_numpy()
unique_reals, first_occurrence_idx, real_counts = np.unique(real_arr_np, return_index=True, return_counts=True)

Expand Down
49 changes: 49 additions & 0 deletions backend_py/primary/primary/services/utils/timeseries_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pyarrow as pa
import pyarrow.compute as pc

from primary.services.service_exceptions import InvalidDataError, Service


def is_valid_vector_table(vector_table: pa.Table, vector_name: str) -> bool:
"""
Check if the vector table is valid.
Expect the table to contain the following columns: DATE, REAL, vector_name.
"""
expected_columns = {"DATE", "REAL", vector_name}
if set(vector_table.column_names) != expected_columns:
unexpected_columns = set(vector_table.column_names) - expected_columns
raise InvalidDataError(f"Unexpected columns in table {unexpected_columns}", Service.SUMO)


def create_delta_vector_table(
first_vector_table: pa.Table, second_vector_table: pa.Table, vector_name: str
) -> pa.Table:
"""
Create a table with delta values of the requested vector name between the two input tables.
Performs "inner join". Only obtain matching index ["DATE", "REAL"] - i.e "DATE"-"REAL" combination
present in only one vector is neglected.
Note: Pre-processing of DATE-columns, e.g. resampling, should be done before calling this function.
`Returns` a table with columns ["DATE", "REAL", vector_name] where vector_name contains the delta values.
"""
is_valid_vector_table(first_vector_table, vector_name)
is_valid_vector_table(second_vector_table, vector_name)

joined_vector_table = first_vector_table.join(
second_vector_table, keys=["DATE", "REAL"], join_type="inner", right_suffix="_second"
)
delta_vector = pc.subtract(
joined_vector_table.column(vector_name), joined_vector_table.column(f"{vector_name}_second")
)
delta_table = pa.table(
{
"DATE": joined_vector_table.column("DATE"),
"REAL": joined_vector_table.column("REAL"),
vector_name: delta_vector,
}
)

return delta_table
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export const LeftNavBar: React.FC<LeftNavBarProps> = (props) => {
onClick={handleEnsembleClick}
className="w-full !text-slate-800 h-10"
startIcon={
selectedEnsembles.length === 0 && !loadingEnsembleSet ? (
selectedEnsembles.length === 0 && createdDeltaEnsembles.length === 0 && !loadingEnsembleSet ? (
<List fontSize="small" className="w-5 h-5 mr-2" />
) : (
<Badge
Expand All @@ -226,7 +226,7 @@ export const LeftNavBar: React.FC<LeftNavBarProps> = (props) => {
loadingEnsembleSet ? (
<CircularProgress size="extra-small" color="inherit" />
) : (
selectedEnsembles.length
selectedEnsembles.length + createdDeltaEnsembles.length
)
}
>
Expand Down

0 comments on commit 5dedec1

Please sign in to comment.