Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
HansKallekleiv committed Nov 7, 2024
1 parent ce735d7 commit 6bccc04
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions backend_py/primary/primary/services/sumo_access/relperm_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from io import BytesIO
import asyncio
from typing import List, Optional, Dict, Sequence
from typing import List, Optional, Dict, Sequence, Any
from dataclasses import dataclass
from fmu.sumo.explorer.objects import Case, TableCollection
import polars as pl
Expand Down Expand Up @@ -77,7 +77,12 @@ async def get_single_realization_table(self, table_name: str) -> pl.DataFrame:
self._case._sumo, self._case_uuid, self._iteration_name, table_name
)
single_realization_blob_id = realization_blob_ids[0]
return await self.fetch_realization_table(single_realization_blob_id)
res = await self.fetch_realization_table(single_realization_blob_id)
blob = BytesIO(res.content)
real_df = pl.read_parquet(blob)
# Add realization id to the dataframe
real_df = real_df.with_columns(pl.lit(single_realization_blob_id.realization_id).alias("REAL"))
return real_df

async def get_relperm_table(
self,
Expand All @@ -91,22 +96,26 @@ async def get_relperm_table(
perf_metrics.record_lap("get_relperm_realization_table_blob_uuids")

tasks = [asyncio.create_task(self.fetch_realization_table(table)) for table in realization_blob_ids]
realization_tables = await asyncio.gather(*tasks)

realization_tables_res = await asyncio.gather(*tasks)
perf_metrics.record_lap("fetch_realization_tables")
realization_tables = []
for res, realization_blob_id in zip(realization_tables_res, realization_blob_ids):
blob = BytesIO(res.content)
real_df = pl.read_parquet(blob)
# Add realization id to the dataframe
real_df = real_df.with_columns(pl.lit(realization_blob_id.realization_id).alias("REAL"))
realization_tables.append(real_df)

table = pl.concat(realization_tables)
perf_metrics.record_lap("concat_realization_tables")

LOGGER.debug(f"RelPermAccess.get_relperm_table: {perf_metrics.to_string()}")
return table

async def fetch_realization_table(self, realization_blob_id: RealizationBlobid) -> pl.DataFrame:
async def fetch_realization_table(self, realization_blob_id: RealizationBlobid) -> Any:
res = await self._case._sumo.get_async(f"/objects('{realization_blob_id.blob_name}')/blob")
blob = BytesIO(res.content)
real_df = pl.read_parquet(blob)
# Add realization id to the dataframe
real_df = real_df.with_columns(pl.lit(realization_blob_id.realization_id).alias("REAL"))
return real_df
return res


def has_required_relperm_table_columns(table_name: str, column_names: List[str]) -> bool:
Expand Down

0 comments on commit 6bccc04

Please sign in to comment.