Skip to content

Commit

Permalink
Merge branch 'main' of github.com:equinor/webviz into spike-layer-system
Browse files Browse the repository at this point in the history
  • Loading branch information
HansKallekleiv committed Sep 13, 2024
2 parents 626ac02 + f480158 commit 892e5a5
Show file tree
Hide file tree
Showing 36 changed files with 1,771 additions and 20 deletions.
2 changes: 2 additions & 0 deletions backend_py/primary/primary/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from primary.routers.seismic.router import router as seismic_router
from primary.routers.surface.router import router as surface_router
from primary.routers.timeseries.router import router as timeseries_router
from primary.routers.vfp.router import router as vfp_router
from primary.routers.well.router import router as well_router
from primary.routers.well_completions.router import router as well_completions_router
from primary.utils.azure_monitor_setup import setup_azure_monitor_telemetry
Expand Down Expand Up @@ -87,6 +88,7 @@ def custom_generate_unique_id(route: APIRoute) -> str:
app.include_router(graph_router, prefix="/graph", tags=["graph"])
app.include_router(observations_router, prefix="/observations", tags=["observations"])
app.include_router(rft_router, prefix="/rft", tags=["rft"])
app.include_router(vfp_router, prefix="/vfp", tags=["vfp"])
app.include_router(dev_router, prefix="/dev", tags=["dev"], include_in_schema=False)

auth_helper = AuthHelper()
Expand Down
4 changes: 1 addition & 3 deletions backend_py/primary/primary/routers/surface/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ async def get_surface_data(
raise HTTPException(status_code=404, detail="Could not get realization surface")

elif addr.address_type == "STAT":
if addr.stat_realizations is not None:
raise HTTPException(status_code=501, detail="Statistics with specific realizations not yet supported")

service_stat_func_to_compute = StatisticFunction.from_string_value(addr.stat_function)
if service_stat_func_to_compute is None:
raise HTTPException(status_code=404, detail="Invalid statistic requested")
Expand All @@ -164,6 +161,7 @@ async def get_surface_data(
statistic_function=service_stat_func_to_compute,
name=addr.name,
attribute=addr.attribute,
realizations=addr.stat_realizations,
time_or_interval_str=addr.iso_time_or_interval,
)
perf_metrics.record_lap("sumo-calc")
Expand Down
69 changes: 69 additions & 0 deletions backend_py/primary/primary/routers/vfp/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import List

from fastapi import APIRouter, Depends, Query, Response, HTTPException

from primary.auth.auth_helper import AuthHelper
from primary.utils.response_perf_metrics import ResponsePerfMetrics
from primary.services.sumo_access.vfp_access import VfpAccess
from primary.services.sumo_access.vfp_types import VfpProdTable
from primary.services.utils.authenticated_user import AuthenticatedUser

from . import schemas

LOGGER = logging.getLogger(__name__)

router = APIRouter()


@router.get("/vfp_table_names/")
async def get_vfp_table_names(
# fmt:off
response: Response,
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
case_uuid: str = Query(description="Sumo case uuid"),
ensemble_name: str = Query(description="Ensemble name"),
realization: int = Query(description="Realization"),
# fmt:on
) -> List[str]:
perf_metrics = ResponsePerfMetrics(response)

vfp_access = await VfpAccess.from_case_uuid_async(
authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name
)
perf_metrics.record_lap("get-access")
vfp_table_names = await vfp_access.get_all_vfp_table_names_for_realization(realization=realization)
perf_metrics.record_lap("get-available-vfp-table-names")
LOGGER.info(f"All Vfp table names loaded in: {perf_metrics.to_string()}")

return vfp_table_names


@router.get("/vfp_table/")
async def get_vfp_table(
# fmt:off
response: Response,
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
case_uuid: str = Query(description="Sumo case uuid"),
ensemble_name: str = Query(description="Ensemble name"),
realization: int = Query(description="Realization"),
vfp_table_name: str = Query(description="VFP table name")
# fmt:on
) -> VfpProdTable:
perf_metrics = ResponsePerfMetrics(response)

vfp_access = await VfpAccess.from_case_uuid_async(
authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name
)
perf_metrics.record_lap("get-access")
try:
vfp_table: VfpProdTable = await vfp_access.get_vfpprod_table_from_tagname(
tagname=vfp_table_name, realization=realization
)
except NotImplementedError as ex:
raise HTTPException(status_code=404, detail=ex)

perf_metrics.record_lap("get-vfp-table")
LOGGER.info(f"VFP table loaded in: {perf_metrics.to_string()}")

return vfp_table
Empty file.
47 changes: 35 additions & 12 deletions backend_py/primary/primary/services/sumo_access/surface_access.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from io import BytesIO
from typing import Optional
from typing import Sequence

import xtgeo

Expand Down Expand Up @@ -118,10 +118,11 @@ async def get_observed_surfaces_metadata_async(self) -> SurfaceMetaSet:
return surf_meta_set

async def get_realization_surface_data_async(
self, real_num: int, name: str, attribute: str, time_or_interval_str: Optional[str] = None
) -> Optional[xtgeo.RegularSurface]:
self, real_num: int, name: str, attribute: str, time_or_interval_str: str | None = None
) -> xtgeo.RegularSurface | None:
"""
Get surface data for a realization surface
If time_or_interval_str is None, only surfaces with no time information will be considered.
"""
if not self._iteration_name:
raise InvalidParameterError("Iteration name must be set to get realization surface", Service.SUMO)
Expand All @@ -148,7 +149,7 @@ async def get_realization_surface_data_async(
f"Multiple ({surf_count}) surfaces found in Sumo for: {surf_str}", Service.SUMO
)
if surf_count == 0:
LOGGER.warning(f"No realization surface found in Sumo for {surf_str}")
LOGGER.warning(f"No realization surface found in Sumo for: {surf_str}")
return None

sumo_surf: Surface = await surface_collection.getitem_async(0)
Expand All @@ -170,7 +171,7 @@ async def get_realization_surface_data_async(

async def get_observed_surface_data_async(
self, name: str, attribute: str, time_or_interval_str: str
) -> Optional[xtgeo.RegularSurface]:
) -> xtgeo.RegularSurface | None:
"""
Get surface data for an observed surface
"""
Expand Down Expand Up @@ -218,17 +219,25 @@ async def get_statistical_surface_data_async(
statistic_function: StatisticFunction,
name: str,
attribute: str,
time_or_interval_str: Optional[str] = None,
) -> Optional[xtgeo.RegularSurface]:
realizations: Sequence[int] | None = None,
time_or_interval_str: str | None = None,
) -> xtgeo.RegularSurface | None:
"""
Compute statistic and return surface data
If realizations is None this is interpreted as a wildcard and surfaces from all realizations will be included
in the statistics. The list of realizations cannon be empty.
If time_or_interval_str is None, only surfaces with no time information will be considered.
"""
if not self._iteration_name:
raise InvalidParameterError("Iteration name must be set to get realization surfaces", Service.SUMO)

if realizations is not None:
if len(realizations) == 0:
raise InvalidParameterError("List of realizations cannot be empty", Service.SUMO)

perf_metrics = PerfMetrics()

surf_str = self._make_real_surf_log_str(-1, name, attribute, time_or_interval_str)
surf_str = self._make_stat_surf_log_str(name, attribute, time_or_interval_str)

time_filter = _time_or_interval_str_to_time_filter(time_or_interval_str)

Expand All @@ -238,28 +247,38 @@ async def get_statistical_surface_data_async(
iteration=self._iteration_name,
name=name,
tagname=attribute,
realization=realizations,
time=time_filter,
)

surf_count = await surface_collection.length_async()
if surf_count == 0:
LOGGER.warning(f"No statistical source surfaces found in Sumo for {surf_str}")
LOGGER.warning(f"No statistical source surfaces found in Sumo for: {surf_str}")
return None
perf_metrics.record_lap("locate")

realizations = await surface_collection.realizations_async
realizations_found = await surface_collection.realizations_async
perf_metrics.record_lap("collect-reals")

# Ensure that we got data for all the requested realizations
if realizations is not None:
missing_reals = list(set(realizations) - set(realizations_found))
if len(missing_reals) > 0:
raise InvalidParameterError(
f"Could not find source surfaces for realizations: {missing_reals} in Sumo for {surf_str}",
Service.SUMO,
)

xtgeo_surf = await _compute_statistical_surface_async(statistic_function, surface_collection)
perf_metrics.record_lap("calc-stat")

if not xtgeo_surf:
LOGGER.warning(f"Could not calculate statistical surface using Sumo for {surf_str}")
LOGGER.warning(f"Could not calculate statistical surface using Sumo for: {surf_str}")
return None

LOGGER.debug(
f"Calculated statistical surface using Sumo in: {perf_metrics.to_string()} "
f"({surf_str} {len(realizations)=})"
f"[{xtgeo_surf.ncol}x{xtgeo_surf.nrow}, real count: {len(realizations_found)}] ({surf_str})"
)

return xtgeo_surf
Expand All @@ -272,6 +291,10 @@ def _make_obs_surf_log_str(self, name: str, attribute: str, date_str: str) -> st
addr_str = f"N={name}, A={attribute}, D={date_str}, C={self._case_uuid}"
return addr_str

def _make_stat_surf_log_str(self, name: str, attribute: str, date_str: str | None) -> str:
addr_str = f"N={name}, A={attribute}, D={date_str}, C={self._case_uuid}, I={self._iteration_name}"
return addr_str


def _build_surface_meta_arr(
src_surf_info_arr: list[SurfInfo], time_type: SurfTimeType, are_observations: bool
Expand Down
122 changes: 122 additions & 0 deletions backend_py/primary/primary/services/sumo_access/vfp_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import logging
from io import BytesIO
from typing import Any, Dict, List

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from fmu.sumo.explorer.objects import Case
from primary.services.service_exceptions import MultipleDataMatchesError, NoDataError, Service

from ._helpers import create_sumo_case_async, create_sumo_client
from .vfp_types import (
ALQ,
GFR,
WFR,
FlowRateTypeProd,
TabType,
UnitType,
VfpProdTable,
VfpType,
VfpParam,
VFPPROD_UNITS,
THP,
)

LOGGER = logging.getLogger(__name__)


class VfpAccess:
"""
Class for accessing and retrieving Vfp tables
"""

def __init__(self, case: Case, iteration_name: str):
self._case = case
self._iteration_name = iteration_name

@classmethod
async def from_case_uuid_async(cls, access_token: str, case_uuid: str, iteration_name: str) -> "VfpAccess":
sumo_client = create_sumo_client(access_token)
case: Case = await create_sumo_case_async(sumo_client, case_uuid, want_keepalive_pit=False)
return VfpAccess(case, iteration_name)

async def get_all_vfp_table_names_for_realization(self, realization: int) -> List[str]:
"""Returns all VFP table names/tagnames for a realization."""
table_collection = self._case.tables.filter(
content="lift_curves", realization=realization, iteration=self._iteration_name
)
table_count = await table_collection.length_async()
if table_count == 0:
raise NoDataError(f"No VFP tables found for realization: {realization}", Service.SUMO)
return table_collection.tagnames

async def get_vfp_table_from_tagname_as_pyarrow(self, tagname: str, realization: int) -> pa.Table:
"""Returns a VFP table as a pyarrow table for a specific tagname (table name)
and realization.
"""

table_collection = self._case.tables.filter(
tagname=tagname, realization=realization, iteration=self._iteration_name
)

table_count = await table_collection.length_async()
if table_count == 0:
raise NoDataError(
f"No VFP table found with tagname: {tagname} and realization: {realization}", Service.SUMO
)
if table_count > 1:
raise MultipleDataMatchesError(
f"Multiple VFP tables found with tagname: {tagname} and realization: {realization}", Service.SUMO
)

sumo_table = await table_collection.getitem_async(0)
byte_stream: BytesIO = await sumo_table.blob_async
pa_table: pa.Table = pq.read_table(byte_stream)

return pa_table

async def get_vfpprod_table_from_tagname(self, tagname: str, realization: int) -> VfpProdTable:
"""Returns a VFP table as a VFP table object for a specific tagname (table name)
and realization.
"""
if tagname.lower().startswith("vfpinj"):
raise NotImplementedError("VFPINJ not implemented.")

pa_table = await self.get_vfp_table_from_tagname_as_pyarrow(tagname, realization)

alq_type = ALQ.UNDEFINED
if pa_table.schema.metadata[b"ALQ_TYPE"].decode("utf-8") != "''":
alq_type = ALQ[pa_table.schema.metadata[b"ALQ_TYPE"].decode("utf-8")]

unit_type = UnitType[pa_table.schema.metadata[b"UNIT_TYPE"].decode("utf-8")]
thp_type = THP[pa_table.schema.metadata[b"THP_TYPE"].decode("utf-8")]
wfr_type = WFR[pa_table.schema.metadata[b"WFR_TYPE"].decode("utf-8")]
gfr_type = GFR[pa_table.schema.metadata[b"GFR_TYPE"].decode("utf-8")]
flow_rate_type = FlowRateTypeProd[pa_table.schema.metadata[b"RATE_TYPE"].decode("utf-8")]
units: Dict[VfpParam, Any] = VFPPROD_UNITS[unit_type]

return VfpProdTable(
vfp_type=VfpType[pa_table.schema.metadata[b"VFP_TYPE"].decode("utf-8")],
table_number=int(pa_table.schema.metadata[b"TABLE_NUMBER"].decode("utf-8")),
datum=float(pa_table.schema.metadata[b"DATUM"].decode("utf-8")),
thp_type=thp_type,
wfr_type=wfr_type,
gfr_type=gfr_type,
alq_type=alq_type,
flow_rate_type=flow_rate_type,
unit_type=unit_type,
tab_type=TabType[pa_table.schema.metadata[b"TAB_TYPE"].decode("utf-8")],
thp_values=np.frombuffer(pa_table.schema.metadata[b"THP_VALUES"], dtype=np.float64).tolist(),
wfr_values=np.frombuffer(pa_table.schema.metadata[b"WFR_VALUES"], dtype=np.float64).tolist(),
gfr_values=np.frombuffer(pa_table.schema.metadata[b"GFR_VALUES"], dtype=np.float64).tolist(),
alq_values=np.frombuffer(pa_table.schema.metadata[b"ALQ_VALUES"], dtype=np.float64).tolist(),
flow_rate_values=np.frombuffer(pa_table.schema.metadata[b"FLOW_VALUES"], dtype=np.float64).tolist(),
bhp_values=[val for sublist in np.array(pa_table.columns).tolist() for val in sublist],
flow_rate_unit=units[VfpParam.FLOWRATE][flow_rate_type],
thp_unit=units[VfpParam.THP][thp_type],
wfr_unit=units[VfpParam.WFR][wfr_type],
gfr_unit=units[VfpParam.GFR][gfr_type],
alq_unit=units[VfpParam.ALQ][alq_type],
bhp_unit=units[VfpParam.THP][thp_type],
)
Loading

0 comments on commit 892e5a5

Please sign in to comment.