From f797c2ca114bb7418a201ce055739bfba127975b Mon Sep 17 00:00:00 2001 From: Hans Kallekleiv <16436291+HansKallekleiv@users.noreply.github.com> Date: Tue, 31 Oct 2023 15:55:34 +0100 Subject: [PATCH] Improved performance of sumo access --- .../sumo_access/inplace_volumetrics_access.py | 114 ++++++++---------- 1 file changed, 48 insertions(+), 66 deletions(-) diff --git a/backend/src/services/sumo_access/inplace_volumetrics_access.py b/backend/src/services/sumo_access/inplace_volumetrics_access.py index fa1697d30..02269c99c 100644 --- a/backend/src/services/sumo_access/inplace_volumetrics_access.py +++ b/backend/src/services/sumo_access/inplace_volumetrics_access.py @@ -9,12 +9,14 @@ import pyarrow as pa import pyarrow.compute as pc import pyarrow.parquet as pq -from fmu.sumo.explorer.objects import TableCollection +from fmu.sumo.explorer.objects import TableCollection, Case from pydantic import ConfigDict, BaseModel from ._helpers import SumoEnsemble from .generic_types import EnsembleScalarResponse +from ..utils.perf_timer import PerfTimer + # from fmu.sumo.explorer.objects.table import AggregatedTable @@ -57,106 +59,64 @@ def has_value(cls, value: str) -> bool: class InplaceVolumetricsCategoricalMetaData(BaseModel): name: str unique_values: List[Union[str, int, float]] - model_config = ConfigDict(from_attributes=True) # Might be removed + model_config = ConfigDict(from_attributes=True) # Might be removed class InplaceVolumetricsTableMetaData(BaseModel): name: str categorical_column_metadata: List[InplaceVolumetricsCategoricalMetaData] numerical_column_names: List[str] - model_config = ConfigDict(from_attributes=True) # Might be removed + model_config = ConfigDict(from_attributes=True) # Might be removed class InplaceVolumetricsAccess(SumoEnsemble): - async def get_table_names_and_metadata(self) -> List[InplaceVolumetricsTableMetaData]: + async def get_table_names_and_metadata( + self, + ) -> List[InplaceVolumetricsTableMetaData]: """Retrieve the available volumetric tables names and corresponding metadata for the case""" + timer = PerfTimer() vol_table_collections: TableCollection = self._case.tables.filter( aggregation="collection", tagname="vol", iteration=self._iteration_name ) vol_tables_metadata = [] - async for vol_table in vol_table_collections: + vol_table_names = await vol_table_collections.names_async + for vol_table_name in vol_table_names: vol_table_collection: TableCollection = self._case.tables.filter( aggregation="collection", - name=vol_table.name, + name=vol_table_name, tagname="vol", iteration=self._iteration_name, ) + vol_table_column_names = await vol_table_collection.columns_async numerical_column_names = [ - col - for col in vol_table_collection.columns - if PossibleInplaceVolumetricsNumericalColumnNames.has_value(col) + col for col in vol_table_column_names if PossibleInplaceVolumetricsNumericalColumnNames.has_value(col) ] - first_numerical_column_table = self.get_table(vol_table.name, numerical_column_names[0]) + first_numerical_column_table = await _load_arrow_table_for_from_sumo( + self._case, + self._iteration_name, + vol_table_name, + numerical_column_names[0], + ) categorical_column_metadata = [ InplaceVolumetricsCategoricalMetaData( name=col, unique_values=pc.unique(first_numerical_column_table[col]).to_pylist(), ) - for col in vol_table_collection.columns + for col in vol_table_column_names if PossibleInplaceVolumetricsCategoricalColumnNames.has_value(col) ] vol_table_metadata = InplaceVolumetricsTableMetaData( - name=vol_table.name, + name=vol_table_name, categorical_column_metadata=categorical_column_metadata, numerical_column_names=numerical_column_names, ) vol_tables_metadata.append(vol_table_metadata) + LOGGER.debug(f"Got volumetric table names and metadata Sumo in: {timer.elapsed_ms()}ms") return vol_tables_metadata - def get_table(self, table_name: str, column_name: str) -> pa.Table: - vol_table_collection: TableCollection = self._case.tables.filter( - aggregation="collection", - name=table_name, - tagname="vol", - iteration=self._iteration_name, - column=column_name, - ) - if not vol_table_collection: - print(f"No aggregated volumetric tables found {self._case_uuid}, {table_name}, {column_name}") - print("Aggregating manually from realization tables...") - full_table = self.temporary_aggregate_from_realization_tables(table_name) - return full_table.select([column_name, "REAL", "FACIES", "ZONE", "REGION"]) - - if len(vol_table_collection) > 1: - raise ValueError(f"None or multiple volumetric tables found {self._case_uuid}, {table_name}, {column_name}") - vol_table = vol_table_collection[0] - byte_stream: BytesIO = vol_table.blob - table: pa.Table = pq.read_table(byte_stream) - return table - - def temporary_aggregate_from_realization_tables(self, table_name: str) -> pa.Table: - """Temporary function to aggregate from realization tables when no aggregated table is available - Assume Sumo will handle this in the future""" - vol_table_collection: TableCollection = self._case.tables.filter( - stage="realization", - name=table_name, - tagname="vol", - iteration=self._iteration_name, - ) - if not vol_table_collection: - raise ValueError(f"No volumetric realization tables found {self._case_uuid}, {table_name}") - - ### Using ThreadPoolExecutor to parallelize the download of the tables - - def worker(idx: int) -> pd.DataFrame: - vol_table = vol_table_collection[idx] - print(f"Downloading table: {table_name} for realization {vol_table.realization}") - byte_stream: BytesIO = vol_table.blob - - table: pd.DataFrame = pd.read_csv(byte_stream) - table["REAL"] = vol_table.realization - return table - - with ThreadPoolExecutor() as executor: - tables = list(executor.map(worker, list(range(len(vol_table_collection))))) - tables = pd.concat(tables) - tables = pa.Table.from_pandas(tables) - - return tables - - def get_response( + async def get_response( self, table_name: str, column_name: str, @@ -164,7 +124,7 @@ def get_response( realizations: Optional[Sequence[int]] = None, ) -> EnsembleScalarResponse: """Retrieve the volumetric response for the given table name and column name""" - table = self.get_table(table_name, column_name) + table = await _load_arrow_table_for_from_sumo(self._case, self._iteration_name, table_name, column_name) if realizations is not None: mask = pc.is_in(table["REAL"], value_set=pa.array(realizations)) table = table.filter(mask) @@ -172,7 +132,6 @@ def get_response( for category in categorical_filters: mask = pc.is_in(table[category.name], value_set=pa.array(category.unique_values)) table = table.filter(mask) - print(table) summed_on_real_table = table.group_by("REAL").aggregate([(column_name, "sum")]).sort_by("REAL") @@ -180,3 +139,26 @@ def get_response( realizations=summed_on_real_table["REAL"].to_pylist(), values=summed_on_real_table[f"{column_name}_sum"].to_pylist(), ) + + +async def _load_arrow_table_for_from_sumo( + case: Case, iteration_name: str, table_name: str, column_name: str +) -> pa.Table: + timer = PerfTimer() + vol_table_collection: TableCollection = case.tables.filter( + aggregation="collection", + name=table_name, + tagname="vol", + iteration=iteration_name, + column=column_name, + ) + if await vol_table_collection.length_async() == 0: + raise ValueError(f"No volumetric tables found for {case.uuid}, {table_name}, {column_name}") + if await vol_table_collection.length_async() > 1: + raise ValueError(f"Multiple volumetric tables found for {case.uuid}, {table_name}, {column_name}") + + vol_table = await vol_table_collection.getitem_async(0) + byte_stream: BytesIO = await vol_table.blob_async + table: pa.Table = pq.read_table(byte_stream) + LOGGER.debug(f"Loaded volumetric table {table_name} from Sumo in: {timer.elapsed_ms()}ms") + return table