Skip to content

Commit

Permalink
Improved performance of sumo access
Browse files Browse the repository at this point in the history
  • Loading branch information
HansKallekleiv committed Oct 31, 2023
1 parent 46088c7 commit f797c2c
Showing 1 changed file with 48 additions and 66 deletions.
114 changes: 48 additions & 66 deletions backend/src/services/sumo_access/inplace_volumetrics_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
from fmu.sumo.explorer.objects import TableCollection
from fmu.sumo.explorer.objects import TableCollection, Case
from pydantic import ConfigDict, BaseModel

from ._helpers import SumoEnsemble
from .generic_types import EnsembleScalarResponse

from ..utils.perf_timer import PerfTimer

# from fmu.sumo.explorer.objects.table import AggregatedTable


Expand Down Expand Up @@ -57,126 +59,106 @@ def has_value(cls, value: str) -> bool:
class InplaceVolumetricsCategoricalMetaData(BaseModel):
name: str
unique_values: List[Union[str, int, float]]
model_config = ConfigDict(from_attributes=True) # Might be removed
model_config = ConfigDict(from_attributes=True) # Might be removed


class InplaceVolumetricsTableMetaData(BaseModel):
name: str
categorical_column_metadata: List[InplaceVolumetricsCategoricalMetaData]
numerical_column_names: List[str]
model_config = ConfigDict(from_attributes=True) # Might be removed
model_config = ConfigDict(from_attributes=True) # Might be removed


class InplaceVolumetricsAccess(SumoEnsemble):
async def get_table_names_and_metadata(self) -> List[InplaceVolumetricsTableMetaData]:
async def get_table_names_and_metadata(
self,
) -> List[InplaceVolumetricsTableMetaData]:
"""Retrieve the available volumetric tables names and corresponding metadata for the case"""
timer = PerfTimer()
vol_table_collections: TableCollection = self._case.tables.filter(
aggregation="collection", tagname="vol", iteration=self._iteration_name
)

vol_tables_metadata = []
async for vol_table in vol_table_collections:
vol_table_names = await vol_table_collections.names_async
for vol_table_name in vol_table_names:
vol_table_collection: TableCollection = self._case.tables.filter(
aggregation="collection",
name=vol_table.name,
name=vol_table_name,
tagname="vol",
iteration=self._iteration_name,
)
vol_table_column_names = await vol_table_collection.columns_async
numerical_column_names = [
col
for col in vol_table_collection.columns
if PossibleInplaceVolumetricsNumericalColumnNames.has_value(col)
col for col in vol_table_column_names if PossibleInplaceVolumetricsNumericalColumnNames.has_value(col)
]
first_numerical_column_table = self.get_table(vol_table.name, numerical_column_names[0])
first_numerical_column_table = await _load_arrow_table_for_from_sumo(
self._case,
self._iteration_name,
vol_table_name,
numerical_column_names[0],
)
categorical_column_metadata = [
InplaceVolumetricsCategoricalMetaData(
name=col,
unique_values=pc.unique(first_numerical_column_table[col]).to_pylist(),
)
for col in vol_table_collection.columns
for col in vol_table_column_names
if PossibleInplaceVolumetricsCategoricalColumnNames.has_value(col)
]
vol_table_metadata = InplaceVolumetricsTableMetaData(
name=vol_table.name,
name=vol_table_name,
categorical_column_metadata=categorical_column_metadata,
numerical_column_names=numerical_column_names,
)

vol_tables_metadata.append(vol_table_metadata)
LOGGER.debug(f"Got volumetric table names and metadata Sumo in: {timer.elapsed_ms()}ms")
return vol_tables_metadata

def get_table(self, table_name: str, column_name: str) -> pa.Table:
vol_table_collection: TableCollection = self._case.tables.filter(
aggregation="collection",
name=table_name,
tagname="vol",
iteration=self._iteration_name,
column=column_name,
)
if not vol_table_collection:
print(f"No aggregated volumetric tables found {self._case_uuid}, {table_name}, {column_name}")
print("Aggregating manually from realization tables...")
full_table = self.temporary_aggregate_from_realization_tables(table_name)
return full_table.select([column_name, "REAL", "FACIES", "ZONE", "REGION"])

if len(vol_table_collection) > 1:
raise ValueError(f"None or multiple volumetric tables found {self._case_uuid}, {table_name}, {column_name}")
vol_table = vol_table_collection[0]
byte_stream: BytesIO = vol_table.blob
table: pa.Table = pq.read_table(byte_stream)
return table

def temporary_aggregate_from_realization_tables(self, table_name: str) -> pa.Table:
"""Temporary function to aggregate from realization tables when no aggregated table is available
Assume Sumo will handle this in the future"""
vol_table_collection: TableCollection = self._case.tables.filter(
stage="realization",
name=table_name,
tagname="vol",
iteration=self._iteration_name,
)
if not vol_table_collection:
raise ValueError(f"No volumetric realization tables found {self._case_uuid}, {table_name}")

### Using ThreadPoolExecutor to parallelize the download of the tables

def worker(idx: int) -> pd.DataFrame:
vol_table = vol_table_collection[idx]
print(f"Downloading table: {table_name} for realization {vol_table.realization}")
byte_stream: BytesIO = vol_table.blob

table: pd.DataFrame = pd.read_csv(byte_stream)
table["REAL"] = vol_table.realization
return table

with ThreadPoolExecutor() as executor:
tables = list(executor.map(worker, list(range(len(vol_table_collection)))))
tables = pd.concat(tables)
tables = pa.Table.from_pandas(tables)

return tables

def get_response(
async def get_response(
self,
table_name: str,
column_name: str,
categorical_filters: Optional[List[InplaceVolumetricsCategoricalMetaData]] = None,
realizations: Optional[Sequence[int]] = None,
) -> EnsembleScalarResponse:
"""Retrieve the volumetric response for the given table name and column name"""
table = self.get_table(table_name, column_name)
table = await _load_arrow_table_for_from_sumo(self._case, self._iteration_name, table_name, column_name)
if realizations is not None:
mask = pc.is_in(table["REAL"], value_set=pa.array(realizations))
table = table.filter(mask)
if categorical_filters is not None:
for category in categorical_filters:
mask = pc.is_in(table[category.name], value_set=pa.array(category.unique_values))
table = table.filter(mask)
print(table)

summed_on_real_table = table.group_by("REAL").aggregate([(column_name, "sum")]).sort_by("REAL")

return EnsembleScalarResponse(
realizations=summed_on_real_table["REAL"].to_pylist(),
values=summed_on_real_table[f"{column_name}_sum"].to_pylist(),
)


async def _load_arrow_table_for_from_sumo(
case: Case, iteration_name: str, table_name: str, column_name: str
) -> pa.Table:
timer = PerfTimer()
vol_table_collection: TableCollection = case.tables.filter(
aggregation="collection",
name=table_name,
tagname="vol",
iteration=iteration_name,
column=column_name,
)
if await vol_table_collection.length_async() == 0:
raise ValueError(f"No volumetric tables found for {case.uuid}, {table_name}, {column_name}")
if await vol_table_collection.length_async() > 1:
raise ValueError(f"Multiple volumetric tables found for {case.uuid}, {table_name}, {column_name}")

vol_table = await vol_table_collection.getitem_async(0)
byte_stream: BytesIO = await vol_table.blob_async
table: pa.Table = pq.read_table(byte_stream)
LOGGER.debug(f"Loaded volumetric table {table_name} from Sumo in: {timer.elapsed_ms()}ms")
return table

0 comments on commit f797c2c

Please sign in to comment.