Skip to content

Commit

Permalink
Introducing delta ensembles - pilot module 'SimulationTimeSeries' (#812)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgenherje authored Jan 8, 2025
1 parent 5bd371e commit e747034
Show file tree
Hide file tree
Showing 146 changed files with 4,447 additions and 1,137 deletions.
42 changes: 34 additions & 8 deletions backend_py/primary/primary/routers/timeseries/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,7 @@ def to_api_vector_statistic_data(
"""
Create API VectorStatisticData from service layer VectorStatistics
"""
value_objects: List[schemas.StatisticValueObject] = []
for api_func_enum in schemas.StatisticFunction:
service_func_enum = StatisticFunction.from_string_value(api_func_enum.value)
if service_func_enum is not None:
value_arr = vector_statistics.values_dict.get(service_func_enum)
if value_arr is not None:
value_objects.append(schemas.StatisticValueObject(statistic_function=api_func_enum, values=value_arr))

value_objects = _create_statistic_value_object_list(vector_statistics)
ret_data = schemas.VectorStatisticData(
realizations=vector_statistics.realizations,
timestamps_utc_ms=vector_statistics.timestamps_utc_ms,
Expand All @@ -48,3 +41,36 @@ def to_api_vector_statistic_data(
)

return ret_data


def to_api_delta_ensemble_vector_statistic_data(
vector_statistics: VectorStatistics, is_rate: bool, unit: str
) -> schemas.VectorStatisticData:
"""
Create API VectorStatisticData from service layer VectorStatistics
"""
value_objects = _create_statistic_value_object_list(vector_statistics)
ret_data = schemas.VectorStatisticData(
realizations=vector_statistics.realizations,
timestamps_utc_ms=vector_statistics.timestamps_utc_ms,
value_objects=value_objects,
unit=unit,
is_rate=is_rate,
)

return ret_data


def _create_statistic_value_object_list(vector_statistics: VectorStatistics) -> list[schemas.StatisticValueObject]:
"""
Create list of statistic value objects from vector statistics object
"""
value_objects: list[schemas.StatisticValueObject] = []
for api_func_enum in schemas.StatisticFunction:
service_func_enum = StatisticFunction.from_string_value(api_func_enum.value)
if service_func_enum is not None:
value_arr = vector_statistics.values_dict.get(service_func_enum)
if value_arr is not None:
value_objects.append(schemas.StatisticValueObject(statistic_function=api_func_enum, values=value_arr))

return value_objects
266 changes: 247 additions & 19 deletions backend_py/primary/primary/routers/timeseries/router.py

Large diffs are not rendered by default.

115 changes: 115 additions & 0 deletions backend_py/primary/primary/services/summary_delta_vectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from dataclasses import dataclass

import pyarrow as pa
import pyarrow.compute as pc
import numpy as np

from primary.services.service_exceptions import InvalidDataError, Service


@dataclass
class RealizationDeltaVector:
realization: int
timestamps_utc_ms: list[int]
values: list[float]
is_rate: bool
unit: str


def _validate_summary_vector_table_pa(
vector_table: pa.Table, vector_name: str, service: Service = Service.GENERAL
) -> None:
"""
Check if the pyarrow vector table is valid.
Expect the pyarrow single vector table to only contain the following columns: DATE, REAL, vector_name.
Raises InvalidDataError if the table does not contain the expected columns.
"""
expected_columns = {"DATE", "REAL", vector_name}
actual_columns = set(vector_table.column_names)
if not expected_columns.issubset(actual_columns) or len(expected_columns) != len(actual_columns):
unexpected_columns = actual_columns - expected_columns
raise InvalidDataError(f"Unexpected columns in table {unexpected_columns}", service)

# Validate table column types
if vector_table.field("DATE").type != pa.timestamp("ms"):
raise InvalidDataError(
f'DATE column must be of type timestamp(ms), but got {vector_table.field("DATE").type}', service
)
if vector_table.field("REAL").type != pa.int16():
raise InvalidDataError("REAL column must be of type int16", service)
if vector_table.field(vector_name).type != pa.float32():
raise InvalidDataError(f"{vector_name} column must be of type float32", service)


def create_delta_vector_table(
comparison_vector_table: pa.Table, reference_vector_table: pa.Table, vector_name: str
) -> pa.Table:
"""
Create a table with delta values of the requested vector name between the two input tables.
Definition:
delta_vector = comparison_vector - reference_vector
Performs "inner join". Only obtain matching index ["DATE", "REAL"] - i.e "DATE"-"REAL" combination
present in only one vector is neglected.
Returns: A table with columns ["DATE", "REAL", vector_name] where vector_name contains the delta values.
`Note`: Pre-processing of DATE-columns, e.g. resampling, should be done before calling this function.
"""
_validate_summary_vector_table_pa(comparison_vector_table, vector_name)
_validate_summary_vector_table_pa(reference_vector_table, vector_name)

joined_vector_table = comparison_vector_table.join(
reference_vector_table, keys=["DATE", "REAL"], join_type="inner", right_suffix="_reference"
)
delta_vector = pc.subtract(
joined_vector_table.column(vector_name), joined_vector_table.column(f"{vector_name}_reference")
)

delta_table = pa.table(
{
"DATE": joined_vector_table.column("DATE"),
"REAL": joined_vector_table.column("REAL"),
vector_name: delta_vector,
}
)

return delta_table


def create_realization_delta_vector_list(
delta_vector_table: pa.Table, vector_name: str, is_rate: bool, unit: str
) -> list[RealizationDeltaVector]:
"""
Create a list of RealizationDeltaVector from the delta vector table.
"""
_validate_summary_vector_table_pa(delta_vector_table, vector_name)

real_arr_np = delta_vector_table.column("REAL").to_numpy()
unique_reals, first_occurrence_idx, real_counts = np.unique(real_arr_np, return_index=True, return_counts=True)

whole_date_np_arr = delta_vector_table.column("DATE").to_numpy()
whole_value_np_arr = delta_vector_table.column(vector_name).to_numpy()

ret_arr: list[RealizationDeltaVector] = []
for i, real in enumerate(unique_reals):
start_row_idx = first_occurrence_idx[i]
row_count = real_counts[i]
date_np_arr = whole_date_np_arr[start_row_idx : start_row_idx + row_count]
value_np_arr = whole_value_np_arr[start_row_idx : start_row_idx + row_count]

ret_arr.append(
RealizationDeltaVector(
realization=real,
timestamps_utc_ms=date_np_arr.astype(int).tolist(),
values=value_np_arr.tolist(),
is_rate=is_rate,
unit=unit,
)
)

return ret_arr
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
from fmu.sumo.explorer.objects import TableCollection, Table
from webviz_pkg.core_utils.perf_timer import PerfTimer

from primary.services.utils.arrow_helpers import sort_table_on_real_then_date, is_date_column_monotonically_increasing
from primary.services.utils.arrow_helpers import find_first_non_increasing_date_pair
from primary.services.utils.arrow_helpers import (
find_first_non_increasing_date_pair,
sort_table_on_real_then_date,
is_date_column_monotonically_increasing,
)
from primary.services.service_exceptions import (
Service,
NoDataError,
Expand Down
192 changes: 192 additions & 0 deletions backend_py/primary/tests/unit/services/test_summary_delta_vectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import pytest
import pyarrow as pa

from primary.services.service_exceptions import InvalidDataError, Service
from primary.services.summary_delta_vectors import (
create_delta_vector_table,
create_realization_delta_vector_list,
RealizationDeltaVector,
_validate_summary_vector_table_pa,
)


VECTOR_TABLE_SCHEMA = pa.schema([("DATE", pa.timestamp("ms")), ("REAL", pa.int16()), ("vector", pa.float32())])


def test_create_delta_vector_table() -> None:
# Create sample data for comparison_vector_table
comparison_data = {"DATE": [1, 2, 3, 4], "REAL": [1, 1, 2, 2], "vector": [10.0, 20.0, 30.0, 40.0]}
comparison_vector_table = pa.table(comparison_data, schema=VECTOR_TABLE_SCHEMA)

# Create sample data for reference_vector_table
reference_data = {"DATE": [1, 2, 3, 4], "REAL": [1, 1, 2, 2], "vector": [5.0, 15.0, 25.0, 35.0]}
reference_vector_table = pa.table(reference_data, schema=VECTOR_TABLE_SCHEMA)

# Expected delta values
expected_delta_data = {"DATE": [1, 2, 3, 4], "REAL": [1, 1, 2, 2], "vector": [5.0, 5.0, 5.0, 5.0]}
expected_delta_table = pa.table(expected_delta_data, schema=VECTOR_TABLE_SCHEMA)

# Call the function
result_table = create_delta_vector_table(comparison_vector_table, reference_vector_table, "vector")

# Validate the result
assert result_table.equals(expected_delta_table)


def test_create_delta_vector_table_with_missing_dates() -> None:
# Create sample data for comparison_vector_table
comparison_data = {"DATE": [1, 2, 4], "REAL": [1, 1, 2], "vector": [10.0, 20.0, 40.0]}
comparison_vector_table = pa.table(comparison_data, schema=VECTOR_TABLE_SCHEMA)

# Create sample data for reference_vector_table
reference_data = {"DATE": [1, 2, 3], "REAL": [1, 1, 2], "vector": [5.0, 15.0, 25.0]}
reference_vector_table = pa.table(reference_data, schema=VECTOR_TABLE_SCHEMA)

# Expected delta values
expected_delta_data = {"DATE": [1, 2], "REAL": [1, 1], "vector": [5.0, 5.0]}
expected_delta_table = pa.table(expected_delta_data, schema=VECTOR_TABLE_SCHEMA)

# Call the function
result_table = create_delta_vector_table(comparison_vector_table, reference_vector_table, "vector")

# Validate the result
assert result_table.equals(expected_delta_table)


def test_create_delta_vector_table_with_different_reals() -> None:
# Create sample data for comparison_vector_table
comparison_data = {"DATE": [1, 2, 3, 4], "REAL": [1, 1, 2, 3], "vector": [10.0, 20.0, 30.0, 40.0]}
comparison_vector_table = pa.table(comparison_data, schema=VECTOR_TABLE_SCHEMA)

# Create sample data for reference_vector_table
reference_data = {"DATE": [1, 2, 3, 4], "REAL": [1, 1, 2, 2], "vector": [5.0, 15.0, 25.0, 35.0]}
reference_vector_table = pa.table(reference_data, schema=VECTOR_TABLE_SCHEMA)

# Expected delta values
expected_delta_data = {"DATE": [1, 2, 3], "REAL": [1, 1, 2], "vector": [5.0, 5.0, 5.0]}
expected_delta_table = pa.table(expected_delta_data, schema=VECTOR_TABLE_SCHEMA)

# Call the function
result_table = create_delta_vector_table(comparison_vector_table, reference_vector_table, "vector")

# Validate the result
assert result_table.equals(expected_delta_table)


def test_create_realization_delta_vector_list() -> None:
# Create sample data for delta_vector_table
delta_data = {"DATE": [1, 2, 3, 4], "REAL": [1, 1, 2, 2], "vector": [5.0, 10.0, 15.0, 20.0]}
delta_vector_table = pa.table(delta_data, schema=VECTOR_TABLE_SCHEMA)

# Expected result
expected_result = [
RealizationDeltaVector(realization=1, timestamps_utc_ms=[1, 2], values=[5.0, 10.0], is_rate=True, unit="unit"),
RealizationDeltaVector(realization=2, timestamps_utc_ms=[3, 4], values=[15.0, 20.0], is_rate=True, unit="unit"),
]

# Call the function
result = create_realization_delta_vector_list(delta_vector_table, "vector", is_rate=True, unit="unit")

# Validate the result
assert result == expected_result


def test_create_realization_delta_vector_list_with_single_real() -> None:
# Create sample data for delta_vector_table
delta_data = {"DATE": [1, 2, 3, 4], "REAL": [1, 1, 1, 1], "vector": [5.0, 10.0, 15.0, 20.0]}
delta_vector_table = pa.table(delta_data, schema=VECTOR_TABLE_SCHEMA)

# Expected result
expected_result = [
RealizationDeltaVector(
realization=1, timestamps_utc_ms=[1, 2, 3, 4], values=[5.0, 10.0, 15.0, 20.0], is_rate=False, unit="unit"
)
]

# Call the function
result = create_realization_delta_vector_list(delta_vector_table, "vector", is_rate=False, unit="unit")

# Validate the result
assert result == expected_result


def test_create_realization_delta_vector_list_with_empty_table() -> None:
# Create an empty delta_vector_table
delta_vector_table = pa.table({"DATE": [], "REAL": [], "vector": []}, schema=VECTOR_TABLE_SCHEMA)

# Expected result
expected_result: list[RealizationDeltaVector] = []

# Call the function
result = create_realization_delta_vector_list(delta_vector_table, "vector", is_rate=True, unit="unit")

# Validate the result
assert result == expected_result


def test_validate_summary_vector_table_pa_valid() -> None:
vector_name = "VECTOR"
data = {"DATE": [1, 2, 3], "REAL": [4, 5, 6], vector_name: [7.0, 8.0, 9.0]}
schema = pa.schema([("DATE", pa.timestamp("ms")), ("REAL", pa.int16()), (vector_name, pa.float32())])
table = pa.Table.from_pydict(data, schema=schema)
try:
_validate_summary_vector_table_pa(table, vector_name)
except InvalidDataError:
pytest.fail("validate_summary_vector_table_pa raised InvalidDataError unexpectedly!")


def test_validate_summary_vector_table_pa_missing_column() -> None:
vector_name = "VECTOR"
data = {"DATE": [1, 2, 3], "REAL": [4, 5, 6]}
schema = pa.schema([("DATE", pa.timestamp("ms")), ("REAL", pa.int16())])
table = pa.Table.from_pydict(data, schema=schema)
with pytest.raises(InvalidDataError):
_validate_summary_vector_table_pa(table, vector_name)


def test_validate_summary_vector_table_pa_unexpected_column() -> None:
vector_name = "VECTOR"
data = {"DATE": [1, 2, 3], "REAL": [4, 5, 6], vector_name: [7.0, 8.0, 9.0], "EXTRA": [10.0, 11.0, 12.0]}
schema = pa.schema(
[("DATE", pa.timestamp("ms")), ("REAL", pa.int16()), (vector_name, pa.float32()), ("EXTRA", pa.float32())]
)
table = pa.Table.from_pydict(data, schema=schema)
with pytest.raises(InvalidDataError):
_validate_summary_vector_table_pa(table, vector_name)


def test_validate_summary_vector_table_pa_invalid_date_type() -> None:
vector_name = "VECTOR"
data = {"DATE": [1, 2, 3], "REAL": [4, 5, 6], vector_name: [7.0, 8.0, 9.0]}
schema = pa.schema([("DATE", pa.int32()), ("REAL", pa.int16()), (vector_name, pa.float32())])
table = pa.Table.from_pydict(data, schema=schema)
with pytest.raises(InvalidDataError):
_validate_summary_vector_table_pa(table, vector_name)


def test_validate_summary_vector_table_pa_invalid_real_type() -> None:
vector_name = "VECTOR"
data = {"DATE": [1, 2, 3], "REAL": [4.0, 5.0, 6.0], vector_name: [7.0, 8.0, 9.0]}
schema = pa.schema([("DATE", pa.timestamp("ms")), ("REAL", pa.float32()), (vector_name, pa.float32())])
table = pa.Table.from_pydict(data, schema=schema)
with pytest.raises(InvalidDataError):
_validate_summary_vector_table_pa(table, vector_name)


def test_validate_summary_vector_table_pa_invalid_vector_type() -> None:
vector_name = "VECTOR"
data = {"DATE": [1, 2, 3], "REAL": [4, 5, 6], vector_name: [7, 8, 9]}
schema = pa.schema([("DATE", pa.timestamp("ms")), ("REAL", pa.int16()), (vector_name, pa.int32())])
table = pa.Table.from_pydict(data, schema=schema)
with pytest.raises(InvalidDataError):
_validate_summary_vector_table_pa(table, vector_name)


def test_validate_summary_vector_table_pa_sumo_service() -> None:
vector_name = "VECTOR"
data = {"DATE": [1, 2, 3], "REAL": [4, 5, 6]}
schema = pa.schema([("DATE", pa.timestamp("ms")), ("REAL", pa.int16())])
table = pa.Table.from_pydict(data, schema=schema)
with pytest.raises(InvalidDataError) as excinfo:
_validate_summary_vector_table_pa(table, vector_name, Service.SUMO)
assert excinfo.value.service == Service.SUMO
11 changes: 3 additions & 8 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,10 @@ function App() {

setIsMounted(true);

const storedEnsembleIdents = workbench.maybeLoadEnsembleSettingsFromLocalStorage();
if (storedEnsembleIdents) {
setInitAppState(InitAppState.LoadingEnsembles);
workbench.loadAndSetupEnsembleSetInSession(queryClient, storedEnsembleIdents).finally(() => {
initApp();
});
} else {
// Initialize the workbench
workbench.initWorkbenchFromLocalStorage(queryClient).finally(() => {
initApp();
}
});

return function handleUnmount() {
workbench.clearLayout();
Expand Down
Loading

0 comments on commit e747034

Please sign in to comment.